1 /*
2 * Copyright (C) 2007 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define TRACE_TAG TRANSPORT
18
19 #include "sysdeps.h"
20
21 #include "transport.h"
22
23 #include <ctype.h>
24 #include <errno.h>
25 #include <inttypes.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <unistd.h>
30
31 #include <algorithm>
32 #include <list>
33 #include <memory>
34 #include <mutex>
35 #include <set>
36 #include <thread>
37
38 #include <adb/crypto/rsa_2048_key.h>
39 #include <adb/crypto/x509_generator.h>
40 #include <adb/tls/tls_connection.h>
41 #include <android-base/logging.h>
42 #include <android-base/no_destructor.h>
43 #include <android-base/parsenetaddress.h>
44 #include <android-base/stringprintf.h>
45 #include <android-base/strings.h>
46 #include <android-base/thread_annotations.h>
47 #include <diagnose_usb.h>
48
49 #include "adb.h"
50 #include "adb_auth.h"
51 #include "adb_io.h"
52 #include "adb_trace.h"
53 #include "adb_utils.h"
54 #include "fdevent/fdevent.h"
55 #include "sysdeps/chrono.h"
56
57 #if ADB_HOST
58 #include "client/usb.h"
59 #endif
60
61 using namespace adb::crypto;
62 using namespace adb::tls;
63 using android::base::ScopedLockAssertion;
64 using TlsError = TlsConnection::TlsError;
65
66 static void remove_transport(atransport* transport);
67 static void transport_destroy(atransport* transport);
68
69 // TODO: unordered_map<TransportId, atransport*>
70 static auto& transport_list = *new std::list<atransport*>();
71 static auto& pending_list = *new std::list<atransport*>();
72
73 static auto& transport_lock = *new std::recursive_mutex();
74
75 const char* const kFeatureShell2 = "shell_v2";
76 const char* const kFeatureCmd = "cmd";
77 const char* const kFeatureStat2 = "stat_v2";
78 const char* const kFeatureLs2 = "ls_v2";
79 const char* const kFeatureLibusb = "libusb";
80 const char* const kFeaturePushSync = "push_sync";
81 const char* const kFeatureApex = "apex";
82 const char* const kFeatureFixedPushMkdir = "fixed_push_mkdir";
83 const char* const kFeatureAbb = "abb";
84 const char* const kFeatureFixedPushSymlinkTimestamp = "fixed_push_symlink_timestamp";
85 const char* const kFeatureAbbExec = "abb_exec";
86 const char* const kFeatureRemountShell = "remount_shell";
87 const char* const kFeatureTrackApp = "track_app";
88 const char* const kFeatureSendRecv2 = "sendrecv_v2";
89 const char* const kFeatureSendRecv2Brotli = "sendrecv_v2_brotli";
90 const char* const kFeatureSendRecv2LZ4 = "sendrecv_v2_lz4";
91 const char* const kFeatureSendRecv2Zstd = "sendrecv_v2_zstd";
92 const char* const kFeatureSendRecv2DryRunSend = "sendrecv_v2_dry_run_send";
93 // TODO(joshuaduong): Bump to v2 when openscreen discovery is enabled by default
94 const char* const kFeatureOpenscreenMdns = "openscreen_mdns";
95
96 namespace {
97
98 #if ADB_HOST
99
100 // Tracks and handles atransport*s that are attempting reconnection.
101 class ReconnectHandler {
102 public:
103 ReconnectHandler() = default;
104 ~ReconnectHandler() = default;
105
106 // Starts the ReconnectHandler thread.
107 void Start();
108
109 // Requests the ReconnectHandler thread to stop.
110 void Stop();
111
112 // Adds the atransport* to the queue of reconnect attempts.
113 void TrackTransport(atransport* transport);
114
115 // Wake up the ReconnectHandler thread to have it check for kicked transports.
116 void CheckForKicked();
117
118 private:
119 // The main thread loop.
120 void Run();
121
122 // Tracks a reconnection attempt.
123 struct ReconnectAttempt {
124 atransport* transport;
125 std::chrono::steady_clock::time_point reconnect_time;
126 size_t attempts_left;
127
operator <__anoneaa291550111::ReconnectHandler::ReconnectAttempt128 bool operator<(const ReconnectAttempt& rhs) const {
129 if (reconnect_time == rhs.reconnect_time) {
130 return reinterpret_cast<uintptr_t>(transport) <
131 reinterpret_cast<uintptr_t>(rhs.transport);
132 }
133 return reconnect_time < rhs.reconnect_time;
134 }
135 };
136
137 // Only retry for up to one minute.
138 static constexpr const std::chrono::seconds kDefaultTimeout = 3s;
139 static constexpr const size_t kMaxAttempts = 20;
140
141 // Protects all members.
142 std::mutex reconnect_mutex_;
143 bool running_ GUARDED_BY(reconnect_mutex_) = true;
144 std::thread handler_thread_;
145 std::condition_variable reconnect_cv_;
146 std::set<ReconnectAttempt> reconnect_queue_ GUARDED_BY(reconnect_mutex_);
147
148 DISALLOW_COPY_AND_ASSIGN(ReconnectHandler);
149 };
150
Start()151 void ReconnectHandler::Start() {
152 check_main_thread();
153 handler_thread_ = std::thread(&ReconnectHandler::Run, this);
154 }
155
Stop()156 void ReconnectHandler::Stop() {
157 check_main_thread();
158 {
159 std::lock_guard<std::mutex> lock(reconnect_mutex_);
160 running_ = false;
161 }
162 reconnect_cv_.notify_one();
163 handler_thread_.join();
164
165 // Drain the queue to free all resources.
166 std::lock_guard<std::mutex> lock(reconnect_mutex_);
167 while (!reconnect_queue_.empty()) {
168 ReconnectAttempt attempt = *reconnect_queue_.begin();
169 reconnect_queue_.erase(reconnect_queue_.begin());
170 remove_transport(attempt.transport);
171 }
172 }
173
TrackTransport(atransport * transport)174 void ReconnectHandler::TrackTransport(atransport* transport) {
175 check_main_thread();
176 {
177 std::lock_guard<std::mutex> lock(reconnect_mutex_);
178 if (!running_) return;
179 // Arbitrary sleep to give adbd time to get ready, if we disconnected because it exited.
180 auto reconnect_time = std::chrono::steady_clock::now() + 250ms;
181 reconnect_queue_.emplace(
182 ReconnectAttempt{transport, reconnect_time, ReconnectHandler::kMaxAttempts});
183 }
184 reconnect_cv_.notify_one();
185 }
186
CheckForKicked()187 void ReconnectHandler::CheckForKicked() {
188 reconnect_cv_.notify_one();
189 }
190
Run()191 void ReconnectHandler::Run() {
192 while (true) {
193 ReconnectAttempt attempt;
194 {
195 std::unique_lock<std::mutex> lock(reconnect_mutex_);
196 ScopedLockAssertion assume_lock(reconnect_mutex_);
197
198 if (!reconnect_queue_.empty()) {
199 // FIXME: libstdc++ (used on Windows) implements condition_variable with
200 // system_clock as its clock, so we're probably hosed if the clock changes,
201 // even if we use steady_clock throughout. This problem goes away once we
202 // switch to libc++.
203 reconnect_cv_.wait_until(lock, reconnect_queue_.begin()->reconnect_time);
204 } else {
205 reconnect_cv_.wait(lock);
206 }
207
208 if (!running_) return;
209
210 // Scan the whole list for kicked transports, so that we immediately handle an explicit
211 // disconnect request.
212 for (auto it = reconnect_queue_.begin(); it != reconnect_queue_.end();) {
213 if (it->transport->kicked()) {
214 D("transport %s was kicked. giving up on it.", it->transport->serial.c_str());
215 remove_transport(it->transport);
216 it = reconnect_queue_.erase(it);
217 } else {
218 ++it;
219 }
220 }
221
222 if (reconnect_queue_.empty()) continue;
223
224 // Go back to sleep if we either woke up spuriously, or we were woken up to remove
225 // a kicked transport, and the first transport isn't ready for reconnection yet.
226 auto now = std::chrono::steady_clock::now();
227 if (reconnect_queue_.begin()->reconnect_time > now) {
228 continue;
229 }
230
231 attempt = *reconnect_queue_.begin();
232 reconnect_queue_.erase(reconnect_queue_.begin());
233 }
234 D("attempting to reconnect %s", attempt.transport->serial.c_str());
235
236 switch (attempt.transport->Reconnect()) {
237 case ReconnectResult::Retry: {
238 D("attempting to reconnect %s failed.", attempt.transport->serial.c_str());
239 if (attempt.attempts_left == 0) {
240 D("transport %s exceeded the number of retry attempts. giving up on it.",
241 attempt.transport->serial.c_str());
242 remove_transport(attempt.transport);
243 continue;
244 }
245
246 std::lock_guard<std::mutex> lock(reconnect_mutex_);
247 reconnect_queue_.emplace(ReconnectAttempt{
248 attempt.transport,
249 std::chrono::steady_clock::now() + ReconnectHandler::kDefaultTimeout,
250 attempt.attempts_left - 1});
251 continue;
252 }
253
254 case ReconnectResult::Success:
255 D("reconnection to %s succeeded.", attempt.transport->serial.c_str());
256 register_transport(attempt.transport);
257 continue;
258
259 case ReconnectResult::Abort:
260 D("cancelling reconnection attempt to %s.", attempt.transport->serial.c_str());
261 remove_transport(attempt.transport);
262 continue;
263 }
264 }
265 }
266
267 static auto& reconnect_handler = *new ReconnectHandler();
268
269 #endif
270
271 } // namespace
272
NextTransportId()273 TransportId NextTransportId() {
274 static std::atomic<TransportId> next(1);
275 return next++;
276 }
277
Reset()278 void Connection::Reset() {
279 LOG(INFO) << "Connection::Reset(): stopping";
280 Stop();
281 }
282
Serial() const283 std::string Connection::Serial() const {
284 return transport_ ? transport_->serial_name() : "<unknown>";
285 }
286
BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)287 BlockingConnectionAdapter::BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)
288 : underlying_(std::move(connection)) {}
289
~BlockingConnectionAdapter()290 BlockingConnectionAdapter::~BlockingConnectionAdapter() {
291 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): destructing";
292 Stop();
293 }
294
Start()295 void BlockingConnectionAdapter::Start() {
296 std::lock_guard<std::mutex> lock(mutex_);
297 if (started_) {
298 LOG(FATAL) << "BlockingConnectionAdapter(" << Serial() << "): started multiple times";
299 }
300
301 StartReadThread();
302
303 write_thread_ = std::thread([this]() {
304 LOG(INFO) << Serial() << ": write thread spawning";
305 while (true) {
306 std::unique_lock<std::mutex> lock(mutex_);
307 ScopedLockAssertion assume_locked(mutex_);
308 cv_.wait(lock, [this]() REQUIRES(mutex_) {
309 return this->stopped_ || !this->write_queue_.empty();
310 });
311
312 if (this->stopped_) {
313 return;
314 }
315
316 std::unique_ptr<apacket> packet = std::move(this->write_queue_.front());
317 this->write_queue_.pop_front();
318 lock.unlock();
319
320 if (!this->underlying_->Write(packet.get())) {
321 break;
322 }
323 }
324 std::call_once(this->error_flag_, [this]() { transport_->HandleError("write failed"); });
325 });
326
327 started_ = true;
328 }
329
StartReadThread()330 void BlockingConnectionAdapter::StartReadThread() {
331 read_thread_ = std::thread([this]() {
332 LOG(INFO) << Serial() << ": read thread spawning";
333 while (true) {
334 auto packet = std::make_unique<apacket>();
335 if (!underlying_->Read(packet.get())) {
336 PLOG(INFO) << Serial() << ": read failed";
337 break;
338 }
339
340 bool got_stls_cmd = false;
341 if (packet->msg.command == A_STLS) {
342 got_stls_cmd = true;
343 }
344
345 transport_->HandleRead(std::move(packet));
346
347 // If we received the STLS packet, we are about to perform the TLS
348 // handshake. So this read thread must stop and resume after the
349 // handshake completes otherwise this will interfere in the process.
350 if (got_stls_cmd) {
351 LOG(INFO) << Serial() << ": Received STLS packet. Stopping read thread.";
352 return;
353 }
354 }
355 std::call_once(this->error_flag_, [this]() { transport_->HandleError("read failed"); });
356 });
357 }
358
DoTlsHandshake(RSA * key,std::string * auth_key)359 bool BlockingConnectionAdapter::DoTlsHandshake(RSA* key, std::string* auth_key) {
360 std::lock_guard<std::mutex> lock(mutex_);
361 if (read_thread_.joinable()) {
362 read_thread_.join();
363 }
364 bool success = this->underlying_->DoTlsHandshake(key, auth_key);
365 StartReadThread();
366 return success;
367 }
368
Reset()369 void BlockingConnectionAdapter::Reset() {
370 {
371 std::lock_guard<std::mutex> lock(mutex_);
372 if (!started_) {
373 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): not started";
374 return;
375 }
376
377 if (stopped_) {
378 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): already stopped";
379 return;
380 }
381 }
382
383 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): resetting";
384 this->underlying_->Reset();
385 Stop();
386 }
387
Stop()388 void BlockingConnectionAdapter::Stop() {
389 {
390 std::lock_guard<std::mutex> lock(mutex_);
391 if (!started_) {
392 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): not started";
393 return;
394 }
395
396 if (stopped_) {
397 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): already stopped";
398 return;
399 }
400
401 stopped_ = true;
402 }
403
404 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): stopping";
405
406 this->underlying_->Close();
407 this->cv_.notify_one();
408
409 // Move the threads out into locals with the lock taken, and then unlock to let them exit.
410 std::thread read_thread;
411 std::thread write_thread;
412
413 {
414 std::lock_guard<std::mutex> lock(mutex_);
415 read_thread = std::move(read_thread_);
416 write_thread = std::move(write_thread_);
417 }
418
419 read_thread.join();
420 write_thread.join();
421
422 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): stopped";
423 std::call_once(this->error_flag_, [this]() { transport_->HandleError("requested stop"); });
424 }
425
Write(std::unique_ptr<apacket> packet)426 bool BlockingConnectionAdapter::Write(std::unique_ptr<apacket> packet) {
427 {
428 std::lock_guard<std::mutex> lock(this->mutex_);
429 write_queue_.emplace_back(std::move(packet));
430 }
431
432 cv_.notify_one();
433 return true;
434 }
435
FdConnection(unique_fd fd)436 FdConnection::FdConnection(unique_fd fd) : fd_(std::move(fd)) {}
437
~FdConnection()438 FdConnection::~FdConnection() {}
439
DispatchRead(void * buf,size_t len)440 bool FdConnection::DispatchRead(void* buf, size_t len) {
441 if (tls_ != nullptr) {
442 // The TlsConnection doesn't allow 0 byte reads
443 if (len == 0) {
444 return true;
445 }
446 return tls_->ReadFully(buf, len);
447 }
448
449 return ReadFdExactly(fd_.get(), buf, len);
450 }
451
DispatchWrite(void * buf,size_t len)452 bool FdConnection::DispatchWrite(void* buf, size_t len) {
453 if (tls_ != nullptr) {
454 // The TlsConnection doesn't allow 0 byte writes
455 if (len == 0) {
456 return true;
457 }
458 return tls_->WriteFully(std::string_view(reinterpret_cast<const char*>(buf), len));
459 }
460
461 return WriteFdExactly(fd_.get(), buf, len);
462 }
463
Read(apacket * packet)464 bool FdConnection::Read(apacket* packet) {
465 if (!DispatchRead(&packet->msg, sizeof(amessage))) {
466 D("remote local: read terminated (message)");
467 return false;
468 }
469
470 if (packet->msg.data_length > MAX_PAYLOAD) {
471 D("remote local: read overflow (data length = %" PRIu32 ")", packet->msg.data_length);
472 return false;
473 }
474
475 packet->payload.resize(packet->msg.data_length);
476
477 if (!DispatchRead(&packet->payload[0], packet->payload.size())) {
478 D("remote local: terminated (data)");
479 return false;
480 }
481
482 return true;
483 }
484
Write(apacket * packet)485 bool FdConnection::Write(apacket* packet) {
486 if (!DispatchWrite(&packet->msg, sizeof(packet->msg))) {
487 D("remote local: write terminated");
488 return false;
489 }
490
491 if (packet->msg.data_length) {
492 if (!DispatchWrite(&packet->payload[0], packet->msg.data_length)) {
493 D("remote local: write terminated");
494 return false;
495 }
496 }
497
498 return true;
499 }
500
DoTlsHandshake(RSA * key,std::string * auth_key)501 bool FdConnection::DoTlsHandshake(RSA* key, std::string* auth_key) {
502 bssl::UniquePtr<EVP_PKEY> evp_pkey(EVP_PKEY_new());
503 if (!EVP_PKEY_set1_RSA(evp_pkey.get(), key)) {
504 LOG(ERROR) << "EVP_PKEY_set1_RSA failed";
505 return false;
506 }
507 auto x509 = GenerateX509Certificate(evp_pkey.get());
508 auto x509_str = X509ToPEMString(x509.get());
509 auto evp_str = Key::ToPEMString(evp_pkey.get());
510
511 int osh = cast_handle_to_int(adb_get_os_handle(fd_));
512 #if ADB_HOST
513 tls_ = TlsConnection::Create(TlsConnection::Role::Client, x509_str, evp_str, osh);
514 #else
515 tls_ = TlsConnection::Create(TlsConnection::Role::Server, x509_str, evp_str, osh);
516 #endif
517 CHECK(tls_);
518 #if ADB_HOST
519 // TLS 1.3 gives the client no message if the server rejected the
520 // certificate. This will enable a check in the tls connection to check
521 // whether the client certificate got rejected. Note that this assumes
522 // that, on handshake success, the server speaks first.
523 tls_->EnableClientPostHandshakeCheck(true);
524 // Add callback to set the certificate when server issues the
525 // CertificateRequest.
526 tls_->SetCertificateCallback(adb_tls_set_certificate);
527 // Allow any server certificate
528 tls_->SetCertVerifyCallback([](X509_STORE_CTX*) { return 1; });
529 #else
530 // Add callback to check certificate against a list of known public keys
531 tls_->SetCertVerifyCallback(
532 [auth_key](X509_STORE_CTX* ctx) { return adbd_tls_verify_cert(ctx, auth_key); });
533 // Add the list of allowed client CA issuers
534 auto ca_list = adbd_tls_client_ca_list();
535 tls_->SetClientCAList(ca_list.get());
536 #endif
537
538 auto err = tls_->DoHandshake();
539 if (err == TlsError::Success) {
540 return true;
541 }
542
543 tls_.reset();
544 return false;
545 }
546
Close()547 void FdConnection::Close() {
548 adb_shutdown(fd_.get());
549 fd_.reset();
550 }
551
send_packet(apacket * p,atransport * t)552 void send_packet(apacket* p, atransport* t) {
553 p->msg.magic = p->msg.command ^ 0xffffffff;
554 // compute a checksum for connection/auth packets for compatibility reasons
555 if (t->get_protocol_version() >= A_VERSION_SKIP_CHECKSUM) {
556 p->msg.data_check = 0;
557 } else {
558 p->msg.data_check = calculate_apacket_checksum(p);
559 }
560
561 VLOG(TRANSPORT) << dump_packet(t->serial.c_str(), "to remote", p);
562
563 if (t == nullptr) {
564 LOG(FATAL) << "Transport is null";
565 }
566
567 if (t->Write(p) != 0) {
568 D("%s: failed to enqueue packet, closing transport", t->serial.c_str());
569 t->Kick();
570 }
571 }
572
kick_transport(atransport * t,bool reset)573 void kick_transport(atransport* t, bool reset) {
574 std::lock_guard<std::recursive_mutex> lock(transport_lock);
575 // As kick_transport() can be called from threads without guarantee that t is valid,
576 // check if the transport is in transport_list first.
577 //
578 // TODO(jmgao): WTF? Is this actually true?
579 if (std::find(transport_list.begin(), transport_list.end(), t) != transport_list.end()) {
580 if (reset) {
581 t->Reset();
582 } else {
583 t->Kick();
584 }
585 }
586
587 #if ADB_HOST
588 reconnect_handler.CheckForKicked();
589 #endif
590 }
591
592 static int transport_registration_send = -1;
593 static int transport_registration_recv = -1;
594 static fdevent* transport_registration_fde;
595
596 #if ADB_HOST
597
598 /* this adds support required by the 'track-devices' service.
599 * this is used to send the content of "list_transport" to any
600 * number of client connections that want it through a single
601 * live TCP connection
602 */
603 struct device_tracker {
604 asocket socket;
605 bool update_needed = false;
606 bool long_output = false;
607 device_tracker* next = nullptr;
608 };
609
610 /* linked list of all device trackers */
611 static device_tracker* device_tracker_list;
612
device_tracker_remove(device_tracker * tracker)613 static void device_tracker_remove(device_tracker* tracker) {
614 device_tracker** pnode = &device_tracker_list;
615 device_tracker* node = *pnode;
616
617 std::lock_guard<std::recursive_mutex> lock(transport_lock);
618 while (node) {
619 if (node == tracker) {
620 *pnode = node->next;
621 break;
622 }
623 pnode = &node->next;
624 node = *pnode;
625 }
626 }
627
device_tracker_close(asocket * socket)628 static void device_tracker_close(asocket* socket) {
629 device_tracker* tracker = (device_tracker*)socket;
630 asocket* peer = socket->peer;
631
632 D("device tracker %p removed", tracker);
633 if (peer) {
634 peer->peer = nullptr;
635 peer->close(peer);
636 }
637 device_tracker_remove(tracker);
638 delete tracker;
639 }
640
device_tracker_enqueue(asocket * socket,apacket::payload_type)641 static int device_tracker_enqueue(asocket* socket, apacket::payload_type) {
642 /* you can't read from a device tracker, close immediately */
643 device_tracker_close(socket);
644 return -1;
645 }
646
device_tracker_send(device_tracker * tracker,const std::string & string)647 static int device_tracker_send(device_tracker* tracker, const std::string& string) {
648 asocket* peer = tracker->socket.peer;
649
650 apacket::payload_type data;
651 data.resize(4 + string.size());
652 char buf[5];
653 snprintf(buf, sizeof(buf), "%04x", static_cast<int>(string.size()));
654 memcpy(&data[0], buf, 4);
655 memcpy(&data[4], string.data(), string.size());
656 return peer->enqueue(peer, std::move(data));
657 }
658
device_tracker_ready(asocket * socket)659 static void device_tracker_ready(asocket* socket) {
660 device_tracker* tracker = reinterpret_cast<device_tracker*>(socket);
661
662 // We want to send the device list when the tracker connects
663 // for the first time, even if no update occurred.
664 if (tracker->update_needed) {
665 tracker->update_needed = false;
666 device_tracker_send(tracker, list_transports(tracker->long_output));
667 }
668 }
669
create_device_tracker(bool long_output)670 asocket* create_device_tracker(bool long_output) {
671 device_tracker* tracker = new device_tracker();
672 if (tracker == nullptr) LOG(FATAL) << "cannot allocate device tracker";
673
674 D("device tracker %p created", tracker);
675
676 tracker->socket.enqueue = device_tracker_enqueue;
677 tracker->socket.ready = device_tracker_ready;
678 tracker->socket.close = device_tracker_close;
679 tracker->update_needed = true;
680 tracker->long_output = long_output;
681
682 tracker->next = device_tracker_list;
683 device_tracker_list = tracker;
684
685 return &tracker->socket;
686 }
687
688 // Check if all of the USB transports are connected.
iterate_transports(std::function<bool (const atransport *)> fn)689 bool iterate_transports(std::function<bool(const atransport*)> fn) {
690 std::lock_guard<std::recursive_mutex> lock(transport_lock);
691 for (const auto& t : transport_list) {
692 if (!fn(t)) {
693 return false;
694 }
695 }
696 for (const auto& t : pending_list) {
697 if (!fn(t)) {
698 return false;
699 }
700 }
701 return true;
702 }
703
704 // Call this function each time the transport list has changed.
update_transports()705 void update_transports() {
706 update_transport_status();
707
708 // Notify `adb track-devices` clients.
709 device_tracker* tracker = device_tracker_list;
710 while (tracker != nullptr) {
711 device_tracker* next = tracker->next;
712 // This may destroy the tracker if the connection is closed.
713 device_tracker_send(tracker, list_transports(tracker->long_output));
714 tracker = next;
715 }
716 }
717
718 #else
719
update_transports()720 void update_transports() {
721 // Nothing to do on the device side.
722 }
723
724 #endif // ADB_HOST
725
726 struct tmsg {
727 atransport* transport;
728 int action;
729 };
730
transport_read_action(int fd,struct tmsg * m)731 static int transport_read_action(int fd, struct tmsg* m) {
732 char* p = (char*)m;
733 int len = sizeof(*m);
734 int r;
735
736 while (len > 0) {
737 r = adb_read(fd, p, len);
738 if (r > 0) {
739 len -= r;
740 p += r;
741 } else {
742 D("transport_read_action: on fd %d: %s", fd, strerror(errno));
743 return -1;
744 }
745 }
746 return 0;
747 }
748
transport_write_action(int fd,struct tmsg * m)749 static int transport_write_action(int fd, struct tmsg* m) {
750 char* p = (char*)m;
751 int len = sizeof(*m);
752 int r;
753
754 while (len > 0) {
755 r = adb_write(fd, p, len);
756 if (r > 0) {
757 len -= r;
758 p += r;
759 } else {
760 D("transport_write_action: on fd %d: %s", fd, strerror(errno));
761 return -1;
762 }
763 }
764 return 0;
765 }
766
usb_devices_start_detached()767 static bool usb_devices_start_detached() {
768 #if ADB_HOST
769 static const char* env = getenv("ADB_LIBUSB_START_DETACHED");
770 static bool result = env && strcmp("1", env) == 0;
771 return should_use_libusb() && result;
772 #else
773 return false;
774 #endif
775 }
776
transport_registration_func(int _fd,unsigned ev,void *)777 static void transport_registration_func(int _fd, unsigned ev, void*) {
778 tmsg m;
779 atransport* t;
780
781 if (!(ev & FDE_READ)) {
782 return;
783 }
784
785 if (transport_read_action(_fd, &m)) {
786 PLOG(FATAL) << "cannot read transport registration socket";
787 }
788
789 t = m.transport;
790
791 if (m.action == 0) {
792 D("transport: %s deleting", t->serial.c_str());
793
794 {
795 std::lock_guard<std::recursive_mutex> lock(transport_lock);
796 transport_list.remove(t);
797 }
798
799 delete t;
800
801 update_transports();
802 return;
803 }
804
805 /* don't create transport threads for inaccessible devices */
806 if (t->GetConnectionState() != kCsNoPerm) {
807 t->connection()->SetTransport(t);
808
809 if (t->type == kTransportUsb && usb_devices_start_detached()) {
810 t->SetConnectionState(kCsDetached);
811 } else {
812 t->connection()->Start();
813 #if ADB_HOST
814 send_connect(t);
815 #endif
816 }
817 }
818
819 {
820 std::lock_guard<std::recursive_mutex> lock(transport_lock);
821 auto it = std::find(pending_list.begin(), pending_list.end(), t);
822 if (it != pending_list.end()) {
823 pending_list.remove(t);
824 transport_list.push_front(t);
825 }
826 }
827
828 update_transports();
829 }
830
831 #if ADB_HOST
init_reconnect_handler(void)832 void init_reconnect_handler(void) {
833 reconnect_handler.Start();
834 }
835 #endif
836
init_transport_registration(void)837 void init_transport_registration(void) {
838 int s[2];
839
840 if (adb_socketpair(s)) {
841 PLOG(FATAL) << "cannot open transport registration socketpair";
842 }
843 D("socketpair: (%d,%d)", s[0], s[1]);
844
845 transport_registration_send = s[0];
846 transport_registration_recv = s[1];
847
848 transport_registration_fde =
849 fdevent_create(transport_registration_recv, transport_registration_func, nullptr);
850 fdevent_set(transport_registration_fde, FDE_READ);
851 }
852
kick_all_transports()853 void kick_all_transports() {
854 #if ADB_HOST
855 reconnect_handler.Stop();
856 #endif
857 // To avoid only writing part of a packet to a transport after exit, kick all transports.
858 std::lock_guard<std::recursive_mutex> lock(transport_lock);
859 for (auto t : transport_list) {
860 t->Kick();
861 }
862 }
863
kick_all_tcp_tls_transports()864 void kick_all_tcp_tls_transports() {
865 std::lock_guard<std::recursive_mutex> lock(transport_lock);
866 for (auto t : transport_list) {
867 if (t->IsTcpDevice() && t->use_tls) {
868 t->Kick();
869 }
870 }
871 }
872
873 #if !ADB_HOST
kick_all_transports_by_auth_key(std::string_view auth_key)874 void kick_all_transports_by_auth_key(std::string_view auth_key) {
875 std::lock_guard<std::recursive_mutex> lock(transport_lock);
876 for (auto t : transport_list) {
877 if (auth_key == t->auth_key) {
878 t->Kick();
879 }
880 }
881 }
882 #endif
883
884 /* the fdevent select pump is single threaded */
register_transport(atransport * transport)885 void register_transport(atransport* transport) {
886 tmsg m;
887 m.transport = transport;
888 m.action = 1;
889 D("transport: %s registered", transport->serial.c_str());
890 if (transport_write_action(transport_registration_send, &m)) {
891 PLOG(FATAL) << "cannot write transport registration socket";
892 }
893 }
894
remove_transport(atransport * transport)895 static void remove_transport(atransport* transport) {
896 tmsg m;
897 m.transport = transport;
898 m.action = 0;
899 D("transport: %s removed", transport->serial.c_str());
900 if (transport_write_action(transport_registration_send, &m)) {
901 PLOG(FATAL) << "cannot write transport registration socket";
902 }
903 }
904
transport_destroy(atransport * t)905 static void transport_destroy(atransport* t) {
906 check_main_thread();
907 CHECK(t != nullptr);
908
909 std::lock_guard<std::recursive_mutex> lock(transport_lock);
910 LOG(INFO) << "destroying transport " << t->serial_name();
911 t->connection()->Stop();
912 #if ADB_HOST
913 if (t->IsTcpDevice() && !t->kicked()) {
914 D("transport: %s destroy (attempting reconnection)", t->serial.c_str());
915
916 // We need to clear the transport's keys, so that on the next connection, it tries
917 // again from the beginning.
918 t->ResetKeys();
919 reconnect_handler.TrackTransport(t);
920 return;
921 }
922 #endif
923
924 D("transport: %s destroy (kicking and closing)", t->serial.c_str());
925 remove_transport(t);
926 }
927
928 #if ADB_HOST
qual_match(const std::string & to_test,const char * prefix,const std::string & qual,bool sanitize_qual)929 static int qual_match(const std::string& to_test, const char* prefix, const std::string& qual,
930 bool sanitize_qual) {
931 if (to_test.empty()) /* Return true if both the qual and to_test are empty strings. */
932 return qual.empty();
933
934 if (qual.empty()) return 0;
935
936 const char* ptr = to_test.c_str();
937 if (prefix) {
938 while (*prefix) {
939 if (*prefix++ != *ptr++) return 0;
940 }
941 }
942
943 for (char ch : qual) {
944 if (sanitize_qual && !isalnum(ch)) ch = '_';
945 if (ch != *ptr++) return 0;
946 }
947
948 /* Everything matched so far. Return true if *ptr is a NUL. */
949 return !*ptr;
950 }
951
952 // Contains either a device serial string or a USB device address like "usb:2-6"
953 const char* __transport_server_one_device = nullptr;
954
transport_set_one_device(const char * adb_one_device)955 void transport_set_one_device(const char* adb_one_device) {
956 __transport_server_one_device = adb_one_device;
957 }
958
transport_get_one_device()959 const char* transport_get_one_device() {
960 return __transport_server_one_device;
961 }
962
transport_server_owns_device(std::string_view serial)963 bool transport_server_owns_device(std::string_view serial) {
964 if (!__transport_server_one_device) {
965 // If the server doesn't own one device, server owns all devices.
966 return true;
967 }
968 return serial.compare(__transport_server_one_device) == 0;
969 }
970
transport_server_owns_device(std::string_view dev_path,std::string_view serial)971 bool transport_server_owns_device(std::string_view dev_path, std::string_view serial) {
972 if (!__transport_server_one_device) {
973 // If the server doesn't own one device, server owns all devices.
974 return true;
975 }
976 return serial.compare(__transport_server_one_device) == 0 ||
977 dev_path.compare(__transport_server_one_device) == 0;
978 }
979
acquire_one_transport(TransportType type,const char * serial,TransportId transport_id,bool * is_ambiguous,std::string * error_out,bool accept_any_state)980 atransport* acquire_one_transport(TransportType type, const char* serial, TransportId transport_id,
981 bool* is_ambiguous, std::string* error_out,
982 bool accept_any_state) {
983 atransport* result = nullptr;
984
985 if (transport_id != 0) {
986 *error_out = android::base::StringPrintf("no device with transport id '%" PRIu64 "'",
987 transport_id);
988 } else if (serial) {
989 *error_out = android::base::StringPrintf("device '%s' not found", serial);
990 } else if (type == kTransportLocal) {
991 *error_out = "no emulators found";
992 } else if (type == kTransportAny) {
993 *error_out = "no devices/emulators found";
994 } else {
995 *error_out = "no devices found";
996 }
997
998 std::unique_lock<std::recursive_mutex> lock(transport_lock);
999 for (const auto& t : transport_list) {
1000 if (t->GetConnectionState() == kCsNoPerm) {
1001 *error_out = UsbNoPermissionsLongHelpText();
1002 continue;
1003 }
1004
1005 if (transport_id) {
1006 if (t->id == transport_id) {
1007 result = t;
1008 break;
1009 }
1010 } else if (serial) {
1011 if (t->MatchesTarget(serial)) {
1012 if (result) {
1013 *error_out = "more than one device";
1014 if (is_ambiguous) *is_ambiguous = true;
1015 result = nullptr;
1016 break;
1017 }
1018 result = t;
1019 }
1020 } else {
1021 if (type == kTransportUsb && t->type == kTransportUsb) {
1022 if (result) {
1023 *error_out = "more than one device";
1024 if (is_ambiguous) *is_ambiguous = true;
1025 result = nullptr;
1026 break;
1027 }
1028 result = t;
1029 } else if (type == kTransportLocal && t->type == kTransportLocal) {
1030 if (result) {
1031 *error_out = "more than one emulator";
1032 if (is_ambiguous) *is_ambiguous = true;
1033 result = nullptr;
1034 break;
1035 }
1036 result = t;
1037 } else if (type == kTransportAny) {
1038 if (result) {
1039 *error_out = "more than one device/emulator";
1040 if (is_ambiguous) *is_ambiguous = true;
1041 result = nullptr;
1042 break;
1043 }
1044 result = t;
1045 }
1046 }
1047 }
1048 lock.unlock();
1049
1050 if (result && !accept_any_state) {
1051 // The caller requires an active transport.
1052 // Make sure that we're actually connected.
1053 ConnectionState state = result->GetConnectionState();
1054 switch (state) {
1055 case kCsConnecting:
1056 *error_out = "device still connecting";
1057 result = nullptr;
1058 break;
1059
1060 case kCsAuthorizing:
1061 *error_out = "device still authorizing";
1062 result = nullptr;
1063 break;
1064
1065 case kCsUnauthorized: {
1066 *error_out = "device unauthorized.\n";
1067 char* ADB_VENDOR_KEYS = getenv("ADB_VENDOR_KEYS");
1068 *error_out += "This adb server's $ADB_VENDOR_KEYS is ";
1069 *error_out += ADB_VENDOR_KEYS ? ADB_VENDOR_KEYS : "not set";
1070 *error_out += "\n";
1071 *error_out += "Try 'adb kill-server' if that seems wrong.\n";
1072 *error_out += "Otherwise check for a confirmation dialog on your device.";
1073 result = nullptr;
1074 break;
1075 }
1076
1077 case kCsOffline:
1078 *error_out = "device offline";
1079 result = nullptr;
1080 break;
1081
1082 default:
1083 break;
1084 }
1085 }
1086
1087 if (result) {
1088 *error_out = "success";
1089 }
1090
1091 return result;
1092 }
1093
WaitForConnection(std::chrono::milliseconds timeout)1094 bool ConnectionWaitable::WaitForConnection(std::chrono::milliseconds timeout) {
1095 std::unique_lock<std::mutex> lock(mutex_);
1096 ScopedLockAssertion assume_locked(mutex_);
1097 return cv_.wait_for(lock, timeout, [&]() REQUIRES(mutex_) {
1098 return connection_established_ready_;
1099 }) && connection_established_;
1100 }
1101
SetConnectionEstablished(bool success)1102 void ConnectionWaitable::SetConnectionEstablished(bool success) {
1103 {
1104 std::lock_guard<std::mutex> lock(mutex_);
1105 if (connection_established_ready_) return;
1106 connection_established_ready_ = true;
1107 connection_established_ = success;
1108 D("connection established with %d", success);
1109 }
1110 cv_.notify_one();
1111 }
1112 #endif
1113
~atransport()1114 atransport::~atransport() {
1115 #if ADB_HOST
1116 // If the connection callback had not been run before, run it now.
1117 SetConnectionEstablished(false);
1118 #endif
1119 }
1120
Write(apacket * p)1121 int atransport::Write(apacket* p) {
1122 return this->connection()->Write(std::unique_ptr<apacket>(p)) ? 0 : -1;
1123 }
1124
Reset()1125 void atransport::Reset() {
1126 if (!kicked_.exchange(true)) {
1127 LOG(INFO) << "resetting transport " << this << " " << this->serial;
1128 this->connection()->Reset();
1129 }
1130 }
1131
Kick()1132 void atransport::Kick() {
1133 if (!kicked_.exchange(true)) {
1134 LOG(INFO) << "kicking transport " << this << " " << this->serial;
1135 this->connection()->Stop();
1136 }
1137 }
1138
GetConnectionState() const1139 ConnectionState atransport::GetConnectionState() const {
1140 return connection_state_;
1141 }
1142
SetConnectionState(ConnectionState state)1143 void atransport::SetConnectionState(ConnectionState state) {
1144 check_main_thread();
1145 connection_state_ = state;
1146 update_transports();
1147 }
1148
1149 #if ADB_HOST
Attach(std::string * error)1150 bool atransport::Attach(std::string* error) {
1151 D("%s: attach", serial.c_str());
1152 check_main_thread();
1153
1154 if (!should_use_libusb()) {
1155 *error = "attach/detach only implemented for libusb backend";
1156 return false;
1157 }
1158
1159 if (GetConnectionState() != ConnectionState::kCsDetached) {
1160 *error = android::base::StringPrintf("transport %s is not detached", serial.c_str());
1161 return false;
1162 }
1163
1164 ResetKeys();
1165
1166 {
1167 std::lock_guard<std::mutex> lock(mutex_);
1168 if (!connection_->Attach(error)) {
1169 return false;
1170 }
1171 }
1172
1173 send_connect(this);
1174 return true;
1175 }
1176
Detach(std::string * error)1177 bool atransport::Detach(std::string* error) {
1178 D("%s: detach", serial.c_str());
1179 check_main_thread();
1180
1181 if (!should_use_libusb()) {
1182 *error = "attach/detach only implemented for libusb backend";
1183 return false;
1184 }
1185
1186 if (GetConnectionState() == ConnectionState::kCsDetached) {
1187 *error = android::base::StringPrintf("transport %s is already detached", serial.c_str());
1188 return false;
1189 }
1190
1191 handle_offline(this);
1192
1193 {
1194 std::lock_guard<std::mutex> lock(mutex_);
1195 if (!connection_->Detach(error)) {
1196 return false;
1197 }
1198 }
1199
1200 this->SetConnectionState(kCsDetached);
1201 return true;
1202 }
1203 #endif
1204
SetConnection(std::shared_ptr<Connection> connection)1205 void atransport::SetConnection(std::shared_ptr<Connection> connection) {
1206 std::lock_guard<std::mutex> lock(mutex_);
1207 connection_ = std::shared_ptr<Connection>(std::move(connection));
1208 }
1209
HandleRead(std::unique_ptr<apacket> p)1210 bool atransport::HandleRead(std::unique_ptr<apacket> p) {
1211 if (!check_header(p.get(), this)) {
1212 D("%s: remote read: bad header", serial.c_str());
1213 return false;
1214 }
1215
1216 VLOG(TRANSPORT) << dump_packet(serial.c_str(), "from remote", p.get());
1217 apacket* packet = p.release();
1218
1219 // TODO: Does this need to run on the main thread?
1220 fdevent_run_on_main_thread([packet, this]() { handle_packet(packet, this); });
1221 return true;
1222 }
1223
HandleError(const std::string & error)1224 void atransport::HandleError(const std::string& error) {
1225 LOG(INFO) << serial_name() << ": connection terminated: " << error;
1226 fdevent_run_on_main_thread([this]() {
1227 handle_offline(this);
1228 transport_destroy(this);
1229 });
1230 }
1231
update_version(int version,size_t payload)1232 void atransport::update_version(int version, size_t payload) {
1233 protocol_version = std::min(version, A_VERSION);
1234 max_payload = std::min(payload, MAX_PAYLOAD);
1235 }
1236
get_protocol_version() const1237 int atransport::get_protocol_version() const {
1238 return protocol_version;
1239 }
1240
get_tls_version() const1241 int atransport::get_tls_version() const {
1242 return tls_version;
1243 }
1244
get_max_payload() const1245 size_t atransport::get_max_payload() const {
1246 return max_payload;
1247 }
1248
supported_features()1249 const FeatureSet& supported_features() {
1250 static const android::base::NoDestructor<FeatureSet> features([] {
1251 return FeatureSet{
1252 kFeatureShell2,
1253 kFeatureCmd,
1254 kFeatureStat2,
1255 kFeatureLs2,
1256 kFeatureFixedPushMkdir,
1257 kFeatureApex,
1258 kFeatureAbb,
1259 kFeatureFixedPushSymlinkTimestamp,
1260 kFeatureAbbExec,
1261 kFeatureRemountShell,
1262 kFeatureTrackApp,
1263 kFeatureSendRecv2,
1264 kFeatureSendRecv2Brotli,
1265 kFeatureSendRecv2LZ4,
1266 kFeatureSendRecv2Zstd,
1267 kFeatureSendRecv2DryRunSend,
1268 kFeatureOpenscreenMdns,
1269 // Increment ADB_SERVER_VERSION when adding a feature that adbd needs
1270 // to know about. Otherwise, the client can be stuck running an old
1271 // version of the server even after upgrading their copy of adb.
1272 // (http://b/24370690)
1273 };
1274 }());
1275
1276 return *features;
1277 }
1278
FeatureSetToString(const FeatureSet & features)1279 std::string FeatureSetToString(const FeatureSet& features) {
1280 return android::base::Join(features, ',');
1281 }
1282
StringToFeatureSet(const std::string & features_string)1283 FeatureSet StringToFeatureSet(const std::string& features_string) {
1284 if (features_string.empty()) {
1285 return FeatureSet();
1286 }
1287
1288 return android::base::Split(features_string, ",");
1289 }
1290
1291 template <class Range, class Value>
contains(const Range & r,const Value & v)1292 static bool contains(const Range& r, const Value& v) {
1293 return std::find(std::begin(r), std::end(r), v) != std::end(r);
1294 }
1295
CanUseFeature(const FeatureSet & feature_set,const std::string & feature)1296 bool CanUseFeature(const FeatureSet& feature_set, const std::string& feature) {
1297 return contains(feature_set, feature) && contains(supported_features(), feature);
1298 }
1299
has_feature(const std::string & feature) const1300 bool atransport::has_feature(const std::string& feature) const {
1301 return contains(features_, feature);
1302 }
1303
SetFeatures(const std::string & features_string)1304 void atransport::SetFeatures(const std::string& features_string) {
1305 features_ = StringToFeatureSet(features_string);
1306 }
1307
AddDisconnect(adisconnect * disconnect)1308 void atransport::AddDisconnect(adisconnect* disconnect) {
1309 disconnects_.push_back(disconnect);
1310 }
1311
RemoveDisconnect(adisconnect * disconnect)1312 void atransport::RemoveDisconnect(adisconnect* disconnect) {
1313 disconnects_.remove(disconnect);
1314 }
1315
RunDisconnects()1316 void atransport::RunDisconnects() {
1317 for (const auto& disconnect : disconnects_) {
1318 disconnect->func(disconnect->opaque, this);
1319 }
1320 disconnects_.clear();
1321 }
1322
1323 #if ADB_HOST
MatchesTarget(const std::string & target) const1324 bool atransport::MatchesTarget(const std::string& target) const {
1325 if (!serial.empty()) {
1326 if (target == serial) {
1327 return true;
1328 } else if (type == kTransportLocal) {
1329 // Local transports can match [tcp:|udp:]<hostname>[:port].
1330 const char* local_target_ptr = target.c_str();
1331
1332 // For fastboot compatibility, ignore protocol prefixes.
1333 if (android::base::StartsWith(target, "tcp:") ||
1334 android::base::StartsWith(target, "udp:")) {
1335 local_target_ptr += 4;
1336 }
1337
1338 // Parse our |serial| and the given |target| to check if the hostnames and ports match.
1339 std::string serial_host, error;
1340 int serial_port = -1;
1341 if (android::base::ParseNetAddress(serial, &serial_host, &serial_port, nullptr,
1342 &error)) {
1343 // |target| may omit the port to default to ours.
1344 std::string target_host;
1345 int target_port = serial_port;
1346 if (android::base::ParseNetAddress(local_target_ptr, &target_host, &target_port,
1347 nullptr, &error) &&
1348 serial_host == target_host && serial_port == target_port) {
1349 return true;
1350 }
1351 }
1352 }
1353 }
1354
1355 return (target == devpath) || qual_match(target, "product:", product, false) ||
1356 qual_match(target, "model:", model, true) ||
1357 qual_match(target, "device:", device, false);
1358 }
1359
SetConnectionEstablished(bool success)1360 void atransport::SetConnectionEstablished(bool success) {
1361 connection_waitable_->SetConnectionEstablished(success);
1362 }
1363
Reconnect()1364 ReconnectResult atransport::Reconnect() {
1365 return reconnect_(this);
1366 }
1367
1368 // We use newline as our delimiter, make sure to never output it.
sanitize(std::string str,bool alphanumeric)1369 static std::string sanitize(std::string str, bool alphanumeric) {
1370 auto pred = alphanumeric ? [](const char c) { return !isalnum(c); }
1371 : [](const char c) { return c == '\n'; };
1372 std::replace_if(str.begin(), str.end(), pred, '_');
1373 return str;
1374 }
1375
append_transport_info(std::string * result,const char * key,const std::string & value,bool alphanumeric)1376 static void append_transport_info(std::string* result, const char* key, const std::string& value,
1377 bool alphanumeric) {
1378 if (value.empty()) {
1379 return;
1380 }
1381
1382 *result += ' ';
1383 *result += key;
1384 *result += sanitize(value, alphanumeric);
1385 }
1386
append_transport(const atransport * t,std::string * result,bool long_listing)1387 static void append_transport(const atransport* t, std::string* result, bool long_listing) {
1388 std::string serial = t->serial;
1389 if (serial.empty()) {
1390 serial = "(no serial number)";
1391 }
1392
1393 if (!long_listing) {
1394 *result += serial;
1395 *result += '\t';
1396 *result += to_string(t->GetConnectionState());
1397 } else {
1398 android::base::StringAppendF(result, "%-22s %s", serial.c_str(),
1399 to_string(t->GetConnectionState()).c_str());
1400
1401 append_transport_info(result, "", t->devpath, false);
1402 append_transport_info(result, "product:", t->product, false);
1403 append_transport_info(result, "model:", t->model, true);
1404 append_transport_info(result, "device:", t->device, false);
1405
1406 // Put id at the end, so that anyone parsing the output here can always find it by scanning
1407 // backwards from newlines, even with hypothetical devices named 'transport_id:1'.
1408 *result += " transport_id:";
1409 *result += std::to_string(t->id);
1410 }
1411 *result += '\n';
1412 }
1413
list_transports(bool long_listing)1414 std::string list_transports(bool long_listing) {
1415 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1416
1417 auto sorted_transport_list = transport_list;
1418 sorted_transport_list.sort([](atransport*& x, atransport*& y) {
1419 if (x->type != y->type) {
1420 return x->type < y->type;
1421 }
1422 return x->serial < y->serial;
1423 });
1424
1425 std::string result;
1426 for (const auto& t : sorted_transport_list) {
1427 append_transport(t, &result, long_listing);
1428 }
1429 return result;
1430 }
1431
close_usb_devices(std::function<bool (const atransport *)> predicate,bool reset)1432 void close_usb_devices(std::function<bool(const atransport*)> predicate, bool reset) {
1433 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1434 for (auto& t : transport_list) {
1435 if (predicate(t)) {
1436 if (reset) {
1437 t->Reset();
1438 } else {
1439 t->Kick();
1440 }
1441 }
1442 }
1443 }
1444
1445 /* hack for osx */
close_usb_devices(bool reset)1446 void close_usb_devices(bool reset) {
1447 close_usb_devices([](const atransport*) { return true; }, reset);
1448 }
1449 #endif
1450
register_socket_transport(unique_fd s,std::string serial,int port,int local,atransport::ReconnectCallback reconnect,bool use_tls,int * error)1451 bool register_socket_transport(unique_fd s, std::string serial, int port, int local,
1452 atransport::ReconnectCallback reconnect, bool use_tls, int* error) {
1453 atransport* t = new atransport(std::move(reconnect), kCsOffline);
1454 t->use_tls = use_tls;
1455
1456 D("transport: %s init'ing for socket %d, on port %d", serial.c_str(), s.get(), port);
1457 if (init_socket_transport(t, std::move(s), port, local) < 0) {
1458 delete t;
1459 if (error) *error = errno;
1460 return false;
1461 }
1462
1463 std::unique_lock<std::recursive_mutex> lock(transport_lock);
1464 for (const auto& transport : pending_list) {
1465 if (serial == transport->serial) {
1466 VLOG(TRANSPORT) << "socket transport " << transport->serial
1467 << " is already in pending_list and fails to register";
1468 delete t;
1469 if (error) *error = EALREADY;
1470 return false;
1471 }
1472 }
1473
1474 for (const auto& transport : transport_list) {
1475 if (serial == transport->serial) {
1476 VLOG(TRANSPORT) << "socket transport " << transport->serial
1477 << " is already in transport_list and fails to register";
1478 delete t;
1479 if (error) *error = EALREADY;
1480 return false;
1481 }
1482 }
1483
1484 t->serial = std::move(serial);
1485 pending_list.push_front(t);
1486
1487 lock.unlock();
1488
1489 #if ADB_HOST
1490 auto waitable = t->connection_waitable();
1491 #endif
1492 register_transport(t);
1493
1494 if (local == 1) {
1495 // Do not wait for emulator transports.
1496 return true;
1497 }
1498
1499 #if ADB_HOST
1500 if (!waitable->WaitForConnection(std::chrono::seconds(10))) {
1501 if (error) *error = ETIMEDOUT;
1502 return false;
1503 }
1504
1505 if (t->GetConnectionState() == kCsUnauthorized) {
1506 if (error) *error = EPERM;
1507 return false;
1508 }
1509 #endif
1510
1511 return true;
1512 }
1513
1514 #if ADB_HOST
find_transport(const char * serial)1515 atransport* find_transport(const char* serial) {
1516 atransport* result = nullptr;
1517
1518 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1519 for (auto& t : transport_list) {
1520 if (strcmp(serial, t->serial.c_str()) == 0) {
1521 result = t;
1522 break;
1523 }
1524 }
1525
1526 return result;
1527 }
1528
kick_all_tcp_devices()1529 void kick_all_tcp_devices() {
1530 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1531 for (auto& t : transport_list) {
1532 if (t->IsTcpDevice()) {
1533 // Kicking breaks the read_transport thread of this transport out of any read, then
1534 // the read_transport thread will notify the main thread to make this transport
1535 // offline. Then the main thread will notify the write_transport thread to exit.
1536 // Finally, this transport will be closed and freed in the main thread.
1537 t->Kick();
1538 }
1539 }
1540 reconnect_handler.CheckForKicked();
1541 }
1542
register_usb_transport(std::shared_ptr<Connection> connection,const char * serial,const char * devpath,unsigned writeable)1543 void register_usb_transport(std::shared_ptr<Connection> connection, const char* serial,
1544 const char* devpath, unsigned writeable) {
1545 atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm);
1546 if (serial) {
1547 t->serial = serial;
1548 }
1549 if (devpath) {
1550 t->devpath = devpath;
1551 }
1552
1553 t->SetConnection(std::move(connection));
1554 t->type = kTransportUsb;
1555
1556 {
1557 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1558 pending_list.push_front(t);
1559 }
1560
1561 register_transport(t);
1562 }
1563
register_usb_transport(usb_handle * usb,const char * serial,const char * devpath,unsigned writeable)1564 void register_usb_transport(usb_handle* usb, const char* serial, const char* devpath,
1565 unsigned writeable) {
1566 atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm);
1567
1568 D("transport: %p init'ing for usb_handle %p (sn='%s')", t, usb, serial ? serial : "");
1569 init_usb_transport(t, usb);
1570 if (serial) {
1571 t->serial = serial;
1572 }
1573
1574 if (devpath) {
1575 t->devpath = devpath;
1576 }
1577
1578 {
1579 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1580 pending_list.push_front(t);
1581 }
1582
1583 register_transport(t);
1584 }
1585
1586 // This should only be used for transports with connection_state == kCsNoPerm.
unregister_usb_transport(usb_handle * usb)1587 void unregister_usb_transport(usb_handle* usb) {
1588 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1589 transport_list.remove_if([usb](atransport* t) {
1590 return t->GetUsbHandle() == usb && t->GetConnectionState() == kCsNoPerm;
1591 });
1592 }
1593 #endif
1594
check_header(apacket * p,atransport * t)1595 bool check_header(apacket* p, atransport* t) {
1596 if (p->msg.magic != (p->msg.command ^ 0xffffffff)) {
1597 VLOG(RWX) << "check_header(): invalid magic command = " << std::hex << p->msg.command
1598 << ", magic = " << p->msg.magic;
1599 return false;
1600 }
1601
1602 if (p->msg.data_length > t->get_max_payload()) {
1603 VLOG(RWX) << "check_header(): " << p->msg.data_length
1604 << " atransport::max_payload = " << t->get_max_payload();
1605 return false;
1606 }
1607
1608 return true;
1609 }
1610
1611 #if ADB_HOST
Key()1612 std::shared_ptr<RSA> atransport::Key() {
1613 if (keys_.empty()) {
1614 return nullptr;
1615 }
1616
1617 std::shared_ptr<RSA> result = keys_[0];
1618 return result;
1619 }
1620
NextKey()1621 std::shared_ptr<RSA> atransport::NextKey() {
1622 if (keys_.empty()) {
1623 LOG(INFO) << "fetching keys for transport " << this->serial_name();
1624 keys_ = adb_auth_get_private_keys();
1625
1626 // We should have gotten at least one key: the one that's automatically generated.
1627 CHECK(!keys_.empty());
1628 } else {
1629 keys_.pop_front();
1630 }
1631
1632 return Key();
1633 }
1634
ResetKeys()1635 void atransport::ResetKeys() {
1636 keys_.clear();
1637 }
1638 #endif
1639