• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2007 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define TRACE_TAG TRANSPORT
18 
19 #include "sysdeps.h"
20 
21 #include "transport.h"
22 
23 #include <ctype.h>
24 #include <errno.h>
25 #include <inttypes.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <unistd.h>
30 
31 #include <algorithm>
32 #include <list>
33 #include <memory>
34 #include <mutex>
35 #include <set>
36 #include <thread>
37 
38 #include <adb/crypto/rsa_2048_key.h>
39 #include <adb/crypto/x509_generator.h>
40 #include <adb/tls/tls_connection.h>
41 #include <android-base/logging.h>
42 #include <android-base/no_destructor.h>
43 #include <android-base/parsenetaddress.h>
44 #include <android-base/stringprintf.h>
45 #include <android-base/strings.h>
46 #include <android-base/thread_annotations.h>
47 #include <diagnose_usb.h>
48 
49 #include "adb.h"
50 #include "adb_auth.h"
51 #include "adb_io.h"
52 #include "adb_trace.h"
53 #include "adb_utils.h"
54 #include "fdevent/fdevent.h"
55 #include "sysdeps/chrono.h"
56 
57 #if ADB_HOST
58 #include "client/usb.h"
59 #endif
60 
61 using namespace adb::crypto;
62 using namespace adb::tls;
63 using android::base::ScopedLockAssertion;
64 using TlsError = TlsConnection::TlsError;
65 
66 static void remove_transport(atransport* transport);
67 static void transport_destroy(atransport* transport);
68 
69 // TODO: unordered_map<TransportId, atransport*>
70 static auto& transport_list = *new std::list<atransport*>();
71 static auto& pending_list = *new std::list<atransport*>();
72 
73 static auto& transport_lock = *new std::recursive_mutex();
74 
75 const char* const kFeatureShell2 = "shell_v2";
76 const char* const kFeatureCmd = "cmd";
77 const char* const kFeatureStat2 = "stat_v2";
78 const char* const kFeatureLs2 = "ls_v2";
79 const char* const kFeatureLibusb = "libusb";
80 const char* const kFeaturePushSync = "push_sync";
81 const char* const kFeatureApex = "apex";
82 const char* const kFeatureFixedPushMkdir = "fixed_push_mkdir";
83 const char* const kFeatureAbb = "abb";
84 const char* const kFeatureFixedPushSymlinkTimestamp = "fixed_push_symlink_timestamp";
85 const char* const kFeatureAbbExec = "abb_exec";
86 const char* const kFeatureRemountShell = "remount_shell";
87 const char* const kFeatureTrackApp = "track_app";
88 const char* const kFeatureSendRecv2 = "sendrecv_v2";
89 const char* const kFeatureSendRecv2Brotli = "sendrecv_v2_brotli";
90 const char* const kFeatureSendRecv2LZ4 = "sendrecv_v2_lz4";
91 const char* const kFeatureSendRecv2Zstd = "sendrecv_v2_zstd";
92 const char* const kFeatureSendRecv2DryRunSend = "sendrecv_v2_dry_run_send";
93 // TODO(joshuaduong): Bump to v2 when openscreen discovery is enabled by default
94 const char* const kFeatureOpenscreenMdns = "openscreen_mdns";
95 
96 namespace {
97 
98 #if ADB_HOST
99 
100 // Tracks and handles atransport*s that are attempting reconnection.
101 class ReconnectHandler {
102   public:
103     ReconnectHandler() = default;
104     ~ReconnectHandler() = default;
105 
106     // Starts the ReconnectHandler thread.
107     void Start();
108 
109     // Requests the ReconnectHandler thread to stop.
110     void Stop();
111 
112     // Adds the atransport* to the queue of reconnect attempts.
113     void TrackTransport(atransport* transport);
114 
115     // Wake up the ReconnectHandler thread to have it check for kicked transports.
116     void CheckForKicked();
117 
118   private:
119     // The main thread loop.
120     void Run();
121 
122     // Tracks a reconnection attempt.
123     struct ReconnectAttempt {
124         atransport* transport;
125         std::chrono::steady_clock::time_point reconnect_time;
126         size_t attempts_left;
127 
operator <__anoneaa291550111::ReconnectHandler::ReconnectAttempt128         bool operator<(const ReconnectAttempt& rhs) const {
129             if (reconnect_time == rhs.reconnect_time) {
130                 return reinterpret_cast<uintptr_t>(transport) <
131                        reinterpret_cast<uintptr_t>(rhs.transport);
132             }
133             return reconnect_time < rhs.reconnect_time;
134         }
135     };
136 
137     // Only retry for up to one minute.
138     static constexpr const std::chrono::seconds kDefaultTimeout = 3s;
139     static constexpr const size_t kMaxAttempts = 20;
140 
141     // Protects all members.
142     std::mutex reconnect_mutex_;
143     bool running_ GUARDED_BY(reconnect_mutex_) = true;
144     std::thread handler_thread_;
145     std::condition_variable reconnect_cv_;
146     std::set<ReconnectAttempt> reconnect_queue_ GUARDED_BY(reconnect_mutex_);
147 
148     DISALLOW_COPY_AND_ASSIGN(ReconnectHandler);
149 };
150 
Start()151 void ReconnectHandler::Start() {
152     check_main_thread();
153     handler_thread_ = std::thread(&ReconnectHandler::Run, this);
154 }
155 
Stop()156 void ReconnectHandler::Stop() {
157     check_main_thread();
158     {
159         std::lock_guard<std::mutex> lock(reconnect_mutex_);
160         running_ = false;
161     }
162     reconnect_cv_.notify_one();
163     handler_thread_.join();
164 
165     // Drain the queue to free all resources.
166     std::lock_guard<std::mutex> lock(reconnect_mutex_);
167     while (!reconnect_queue_.empty()) {
168         ReconnectAttempt attempt = *reconnect_queue_.begin();
169         reconnect_queue_.erase(reconnect_queue_.begin());
170         remove_transport(attempt.transport);
171     }
172 }
173 
TrackTransport(atransport * transport)174 void ReconnectHandler::TrackTransport(atransport* transport) {
175     check_main_thread();
176     {
177         std::lock_guard<std::mutex> lock(reconnect_mutex_);
178         if (!running_) return;
179         // Arbitrary sleep to give adbd time to get ready, if we disconnected because it exited.
180         auto reconnect_time = std::chrono::steady_clock::now() + 250ms;
181         reconnect_queue_.emplace(
182                 ReconnectAttempt{transport, reconnect_time, ReconnectHandler::kMaxAttempts});
183     }
184     reconnect_cv_.notify_one();
185 }
186 
CheckForKicked()187 void ReconnectHandler::CheckForKicked() {
188     reconnect_cv_.notify_one();
189 }
190 
Run()191 void ReconnectHandler::Run() {
192     while (true) {
193         ReconnectAttempt attempt;
194         {
195             std::unique_lock<std::mutex> lock(reconnect_mutex_);
196             ScopedLockAssertion assume_lock(reconnect_mutex_);
197 
198             if (!reconnect_queue_.empty()) {
199                 // FIXME: libstdc++ (used on Windows) implements condition_variable with
200                 //        system_clock as its clock, so we're probably hosed if the clock changes,
201                 //        even if we use steady_clock throughout. This problem goes away once we
202                 //        switch to libc++.
203                 reconnect_cv_.wait_until(lock, reconnect_queue_.begin()->reconnect_time);
204             } else {
205                 reconnect_cv_.wait(lock);
206             }
207 
208             if (!running_) return;
209 
210             // Scan the whole list for kicked transports, so that we immediately handle an explicit
211             // disconnect request.
212             for (auto it = reconnect_queue_.begin(); it != reconnect_queue_.end();) {
213                 if (it->transport->kicked()) {
214                     D("transport %s was kicked. giving up on it.", it->transport->serial.c_str());
215                     remove_transport(it->transport);
216                     it = reconnect_queue_.erase(it);
217                 } else {
218                     ++it;
219                 }
220             }
221 
222             if (reconnect_queue_.empty()) continue;
223 
224             // Go back to sleep if we either woke up spuriously, or we were woken up to remove
225             // a kicked transport, and the first transport isn't ready for reconnection yet.
226             auto now = std::chrono::steady_clock::now();
227             if (reconnect_queue_.begin()->reconnect_time > now) {
228                 continue;
229             }
230 
231             attempt = *reconnect_queue_.begin();
232             reconnect_queue_.erase(reconnect_queue_.begin());
233         }
234         D("attempting to reconnect %s", attempt.transport->serial.c_str());
235 
236         switch (attempt.transport->Reconnect()) {
237             case ReconnectResult::Retry: {
238                 D("attempting to reconnect %s failed.", attempt.transport->serial.c_str());
239                 if (attempt.attempts_left == 0) {
240                     D("transport %s exceeded the number of retry attempts. giving up on it.",
241                       attempt.transport->serial.c_str());
242                     remove_transport(attempt.transport);
243                     continue;
244                 }
245 
246                 std::lock_guard<std::mutex> lock(reconnect_mutex_);
247                 reconnect_queue_.emplace(ReconnectAttempt{
248                         attempt.transport,
249                         std::chrono::steady_clock::now() + ReconnectHandler::kDefaultTimeout,
250                         attempt.attempts_left - 1});
251                 continue;
252             }
253 
254             case ReconnectResult::Success:
255                 D("reconnection to %s succeeded.", attempt.transport->serial.c_str());
256                 register_transport(attempt.transport);
257                 continue;
258 
259             case ReconnectResult::Abort:
260                 D("cancelling reconnection attempt to %s.", attempt.transport->serial.c_str());
261                 remove_transport(attempt.transport);
262                 continue;
263         }
264     }
265 }
266 
267 static auto& reconnect_handler = *new ReconnectHandler();
268 
269 #endif
270 
271 }  // namespace
272 
NextTransportId()273 TransportId NextTransportId() {
274     static std::atomic<TransportId> next(1);
275     return next++;
276 }
277 
Reset()278 void Connection::Reset() {
279     LOG(INFO) << "Connection::Reset(): stopping";
280     Stop();
281 }
282 
Serial() const283 std::string Connection::Serial() const {
284     return transport_ ? transport_->serial_name() : "<unknown>";
285 }
286 
BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)287 BlockingConnectionAdapter::BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)
288     : underlying_(std::move(connection)) {}
289 
~BlockingConnectionAdapter()290 BlockingConnectionAdapter::~BlockingConnectionAdapter() {
291     LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): destructing";
292     Stop();
293 }
294 
Start()295 void BlockingConnectionAdapter::Start() {
296     std::lock_guard<std::mutex> lock(mutex_);
297     if (started_) {
298         LOG(FATAL) << "BlockingConnectionAdapter(" << Serial() << "): started multiple times";
299     }
300 
301     StartReadThread();
302 
303     write_thread_ = std::thread([this]() {
304         LOG(INFO) << Serial() << ": write thread spawning";
305         while (true) {
306             std::unique_lock<std::mutex> lock(mutex_);
307             ScopedLockAssertion assume_locked(mutex_);
308             cv_.wait(lock, [this]() REQUIRES(mutex_) {
309                 return this->stopped_ || !this->write_queue_.empty();
310             });
311 
312             if (this->stopped_) {
313                 return;
314             }
315 
316             std::unique_ptr<apacket> packet = std::move(this->write_queue_.front());
317             this->write_queue_.pop_front();
318             lock.unlock();
319 
320             if (!this->underlying_->Write(packet.get())) {
321                 break;
322             }
323         }
324         std::call_once(this->error_flag_, [this]() { transport_->HandleError("write failed"); });
325     });
326 
327     started_ = true;
328 }
329 
StartReadThread()330 void BlockingConnectionAdapter::StartReadThread() {
331     read_thread_ = std::thread([this]() {
332         LOG(INFO) << Serial() << ": read thread spawning";
333         while (true) {
334             auto packet = std::make_unique<apacket>();
335             if (!underlying_->Read(packet.get())) {
336                 PLOG(INFO) << Serial() << ": read failed";
337                 break;
338             }
339 
340             bool got_stls_cmd = false;
341             if (packet->msg.command == A_STLS) {
342                 got_stls_cmd = true;
343             }
344 
345             transport_->HandleRead(std::move(packet));
346 
347             // If we received the STLS packet, we are about to perform the TLS
348             // handshake. So this read thread must stop and resume after the
349             // handshake completes otherwise this will interfere in the process.
350             if (got_stls_cmd) {
351                 LOG(INFO) << Serial() << ": Received STLS packet. Stopping read thread.";
352                 return;
353             }
354         }
355         std::call_once(this->error_flag_, [this]() { transport_->HandleError("read failed"); });
356     });
357 }
358 
DoTlsHandshake(RSA * key,std::string * auth_key)359 bool BlockingConnectionAdapter::DoTlsHandshake(RSA* key, std::string* auth_key) {
360     std::lock_guard<std::mutex> lock(mutex_);
361     if (read_thread_.joinable()) {
362         read_thread_.join();
363     }
364     bool success = this->underlying_->DoTlsHandshake(key, auth_key);
365     StartReadThread();
366     return success;
367 }
368 
Reset()369 void BlockingConnectionAdapter::Reset() {
370     {
371         std::lock_guard<std::mutex> lock(mutex_);
372         if (!started_) {
373             LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): not started";
374             return;
375         }
376 
377         if (stopped_) {
378             LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): already stopped";
379             return;
380         }
381     }
382 
383     LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): resetting";
384     this->underlying_->Reset();
385     Stop();
386 }
387 
Stop()388 void BlockingConnectionAdapter::Stop() {
389     {
390         std::lock_guard<std::mutex> lock(mutex_);
391         if (!started_) {
392             LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): not started";
393             return;
394         }
395 
396         if (stopped_) {
397             LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): already stopped";
398             return;
399         }
400 
401         stopped_ = true;
402     }
403 
404     LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): stopping";
405 
406     this->underlying_->Close();
407     this->cv_.notify_one();
408 
409     // Move the threads out into locals with the lock taken, and then unlock to let them exit.
410     std::thread read_thread;
411     std::thread write_thread;
412 
413     {
414         std::lock_guard<std::mutex> lock(mutex_);
415         read_thread = std::move(read_thread_);
416         write_thread = std::move(write_thread_);
417     }
418 
419     read_thread.join();
420     write_thread.join();
421 
422     LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): stopped";
423     std::call_once(this->error_flag_, [this]() { transport_->HandleError("requested stop"); });
424 }
425 
Write(std::unique_ptr<apacket> packet)426 bool BlockingConnectionAdapter::Write(std::unique_ptr<apacket> packet) {
427     {
428         std::lock_guard<std::mutex> lock(this->mutex_);
429         write_queue_.emplace_back(std::move(packet));
430     }
431 
432     cv_.notify_one();
433     return true;
434 }
435 
FdConnection(unique_fd fd)436 FdConnection::FdConnection(unique_fd fd) : fd_(std::move(fd)) {}
437 
~FdConnection()438 FdConnection::~FdConnection() {}
439 
DispatchRead(void * buf,size_t len)440 bool FdConnection::DispatchRead(void* buf, size_t len) {
441     if (tls_ != nullptr) {
442         // The TlsConnection doesn't allow 0 byte reads
443         if (len == 0) {
444             return true;
445         }
446         return tls_->ReadFully(buf, len);
447     }
448 
449     return ReadFdExactly(fd_.get(), buf, len);
450 }
451 
DispatchWrite(void * buf,size_t len)452 bool FdConnection::DispatchWrite(void* buf, size_t len) {
453     if (tls_ != nullptr) {
454         // The TlsConnection doesn't allow 0 byte writes
455         if (len == 0) {
456             return true;
457         }
458         return tls_->WriteFully(std::string_view(reinterpret_cast<const char*>(buf), len));
459     }
460 
461     return WriteFdExactly(fd_.get(), buf, len);
462 }
463 
Read(apacket * packet)464 bool FdConnection::Read(apacket* packet) {
465     if (!DispatchRead(&packet->msg, sizeof(amessage))) {
466         D("remote local: read terminated (message)");
467         return false;
468     }
469 
470     if (packet->msg.data_length > MAX_PAYLOAD) {
471         D("remote local: read overflow (data length = %" PRIu32 ")", packet->msg.data_length);
472         return false;
473     }
474 
475     packet->payload.resize(packet->msg.data_length);
476 
477     if (!DispatchRead(&packet->payload[0], packet->payload.size())) {
478         D("remote local: terminated (data)");
479         return false;
480     }
481 
482     return true;
483 }
484 
Write(apacket * packet)485 bool FdConnection::Write(apacket* packet) {
486     if (!DispatchWrite(&packet->msg, sizeof(packet->msg))) {
487         D("remote local: write terminated");
488         return false;
489     }
490 
491     if (packet->msg.data_length) {
492         if (!DispatchWrite(&packet->payload[0], packet->msg.data_length)) {
493             D("remote local: write terminated");
494             return false;
495         }
496     }
497 
498     return true;
499 }
500 
DoTlsHandshake(RSA * key,std::string * auth_key)501 bool FdConnection::DoTlsHandshake(RSA* key, std::string* auth_key) {
502     bssl::UniquePtr<EVP_PKEY> evp_pkey(EVP_PKEY_new());
503     if (!EVP_PKEY_set1_RSA(evp_pkey.get(), key)) {
504         LOG(ERROR) << "EVP_PKEY_set1_RSA failed";
505         return false;
506     }
507     auto x509 = GenerateX509Certificate(evp_pkey.get());
508     auto x509_str = X509ToPEMString(x509.get());
509     auto evp_str = Key::ToPEMString(evp_pkey.get());
510 
511     int osh = cast_handle_to_int(adb_get_os_handle(fd_));
512 #if ADB_HOST
513     tls_ = TlsConnection::Create(TlsConnection::Role::Client, x509_str, evp_str, osh);
514 #else
515     tls_ = TlsConnection::Create(TlsConnection::Role::Server, x509_str, evp_str, osh);
516 #endif
517     CHECK(tls_);
518 #if ADB_HOST
519     // TLS 1.3 gives the client no message if the server rejected the
520     // certificate. This will enable a check in the tls connection to check
521     // whether the client certificate got rejected. Note that this assumes
522     // that, on handshake success, the server speaks first.
523     tls_->EnableClientPostHandshakeCheck(true);
524     // Add callback to set the certificate when server issues the
525     // CertificateRequest.
526     tls_->SetCertificateCallback(adb_tls_set_certificate);
527     // Allow any server certificate
528     tls_->SetCertVerifyCallback([](X509_STORE_CTX*) { return 1; });
529 #else
530     // Add callback to check certificate against a list of known public keys
531     tls_->SetCertVerifyCallback(
532             [auth_key](X509_STORE_CTX* ctx) { return adbd_tls_verify_cert(ctx, auth_key); });
533     // Add the list of allowed client CA issuers
534     auto ca_list = adbd_tls_client_ca_list();
535     tls_->SetClientCAList(ca_list.get());
536 #endif
537 
538     auto err = tls_->DoHandshake();
539     if (err == TlsError::Success) {
540         return true;
541     }
542 
543     tls_.reset();
544     return false;
545 }
546 
Close()547 void FdConnection::Close() {
548     adb_shutdown(fd_.get());
549     fd_.reset();
550 }
551 
send_packet(apacket * p,atransport * t)552 void send_packet(apacket* p, atransport* t) {
553     p->msg.magic = p->msg.command ^ 0xffffffff;
554     // compute a checksum for connection/auth packets for compatibility reasons
555     if (t->get_protocol_version() >= A_VERSION_SKIP_CHECKSUM) {
556         p->msg.data_check = 0;
557     } else {
558         p->msg.data_check = calculate_apacket_checksum(p);
559     }
560 
561     VLOG(TRANSPORT) << dump_packet(t->serial.c_str(), "to remote", p);
562 
563     if (t == nullptr) {
564         LOG(FATAL) << "Transport is null";
565     }
566 
567     if (t->Write(p) != 0) {
568         D("%s: failed to enqueue packet, closing transport", t->serial.c_str());
569         t->Kick();
570     }
571 }
572 
kick_transport(atransport * t,bool reset)573 void kick_transport(atransport* t, bool reset) {
574     std::lock_guard<std::recursive_mutex> lock(transport_lock);
575     // As kick_transport() can be called from threads without guarantee that t is valid,
576     // check if the transport is in transport_list first.
577     //
578     // TODO(jmgao): WTF? Is this actually true?
579     if (std::find(transport_list.begin(), transport_list.end(), t) != transport_list.end()) {
580         if (reset) {
581             t->Reset();
582         } else {
583             t->Kick();
584         }
585     }
586 
587 #if ADB_HOST
588     reconnect_handler.CheckForKicked();
589 #endif
590 }
591 
592 static int transport_registration_send = -1;
593 static int transport_registration_recv = -1;
594 static fdevent* transport_registration_fde;
595 
596 #if ADB_HOST
597 
598 /* this adds support required by the 'track-devices' service.
599  * this is used to send the content of "list_transport" to any
600  * number of client connections that want it through a single
601  * live TCP connection
602  */
603 struct device_tracker {
604     asocket socket;
605     bool update_needed = false;
606     bool long_output = false;
607     device_tracker* next = nullptr;
608 };
609 
610 /* linked list of all device trackers */
611 static device_tracker* device_tracker_list;
612 
device_tracker_remove(device_tracker * tracker)613 static void device_tracker_remove(device_tracker* tracker) {
614     device_tracker** pnode = &device_tracker_list;
615     device_tracker* node = *pnode;
616 
617     std::lock_guard<std::recursive_mutex> lock(transport_lock);
618     while (node) {
619         if (node == tracker) {
620             *pnode = node->next;
621             break;
622         }
623         pnode = &node->next;
624         node = *pnode;
625     }
626 }
627 
device_tracker_close(asocket * socket)628 static void device_tracker_close(asocket* socket) {
629     device_tracker* tracker = (device_tracker*)socket;
630     asocket* peer = socket->peer;
631 
632     D("device tracker %p removed", tracker);
633     if (peer) {
634         peer->peer = nullptr;
635         peer->close(peer);
636     }
637     device_tracker_remove(tracker);
638     delete tracker;
639 }
640 
device_tracker_enqueue(asocket * socket,apacket::payload_type)641 static int device_tracker_enqueue(asocket* socket, apacket::payload_type) {
642     /* you can't read from a device tracker, close immediately */
643     device_tracker_close(socket);
644     return -1;
645 }
646 
device_tracker_send(device_tracker * tracker,const std::string & string)647 static int device_tracker_send(device_tracker* tracker, const std::string& string) {
648     asocket* peer = tracker->socket.peer;
649 
650     apacket::payload_type data;
651     data.resize(4 + string.size());
652     char buf[5];
653     snprintf(buf, sizeof(buf), "%04x", static_cast<int>(string.size()));
654     memcpy(&data[0], buf, 4);
655     memcpy(&data[4], string.data(), string.size());
656     return peer->enqueue(peer, std::move(data));
657 }
658 
device_tracker_ready(asocket * socket)659 static void device_tracker_ready(asocket* socket) {
660     device_tracker* tracker = reinterpret_cast<device_tracker*>(socket);
661 
662     // We want to send the device list when the tracker connects
663     // for the first time, even if no update occurred.
664     if (tracker->update_needed) {
665         tracker->update_needed = false;
666         device_tracker_send(tracker, list_transports(tracker->long_output));
667     }
668 }
669 
create_device_tracker(bool long_output)670 asocket* create_device_tracker(bool long_output) {
671     device_tracker* tracker = new device_tracker();
672     if (tracker == nullptr) LOG(FATAL) << "cannot allocate device tracker";
673 
674     D("device tracker %p created", tracker);
675 
676     tracker->socket.enqueue = device_tracker_enqueue;
677     tracker->socket.ready = device_tracker_ready;
678     tracker->socket.close = device_tracker_close;
679     tracker->update_needed = true;
680     tracker->long_output = long_output;
681 
682     tracker->next = device_tracker_list;
683     device_tracker_list = tracker;
684 
685     return &tracker->socket;
686 }
687 
688 // Check if all of the USB transports are connected.
iterate_transports(std::function<bool (const atransport *)> fn)689 bool iterate_transports(std::function<bool(const atransport*)> fn) {
690     std::lock_guard<std::recursive_mutex> lock(transport_lock);
691     for (const auto& t : transport_list) {
692         if (!fn(t)) {
693             return false;
694         }
695     }
696     for (const auto& t : pending_list) {
697         if (!fn(t)) {
698             return false;
699         }
700     }
701     return true;
702 }
703 
704 // Call this function each time the transport list has changed.
update_transports()705 void update_transports() {
706     update_transport_status();
707 
708     // Notify `adb track-devices` clients.
709     device_tracker* tracker = device_tracker_list;
710     while (tracker != nullptr) {
711         device_tracker* next = tracker->next;
712         // This may destroy the tracker if the connection is closed.
713         device_tracker_send(tracker, list_transports(tracker->long_output));
714         tracker = next;
715     }
716 }
717 
718 #else
719 
update_transports()720 void update_transports() {
721     // Nothing to do on the device side.
722 }
723 
724 #endif  // ADB_HOST
725 
726 struct tmsg {
727     atransport* transport;
728     int action;
729 };
730 
transport_read_action(int fd,struct tmsg * m)731 static int transport_read_action(int fd, struct tmsg* m) {
732     char* p = (char*)m;
733     int len = sizeof(*m);
734     int r;
735 
736     while (len > 0) {
737         r = adb_read(fd, p, len);
738         if (r > 0) {
739             len -= r;
740             p += r;
741         } else {
742             D("transport_read_action: on fd %d: %s", fd, strerror(errno));
743             return -1;
744         }
745     }
746     return 0;
747 }
748 
transport_write_action(int fd,struct tmsg * m)749 static int transport_write_action(int fd, struct tmsg* m) {
750     char* p = (char*)m;
751     int len = sizeof(*m);
752     int r;
753 
754     while (len > 0) {
755         r = adb_write(fd, p, len);
756         if (r > 0) {
757             len -= r;
758             p += r;
759         } else {
760             D("transport_write_action: on fd %d: %s", fd, strerror(errno));
761             return -1;
762         }
763     }
764     return 0;
765 }
766 
usb_devices_start_detached()767 static bool usb_devices_start_detached() {
768 #if ADB_HOST
769     static const char* env = getenv("ADB_LIBUSB_START_DETACHED");
770     static bool result = env && strcmp("1", env) == 0;
771     return should_use_libusb() && result;
772 #else
773     return false;
774 #endif
775 }
776 
transport_registration_func(int _fd,unsigned ev,void *)777 static void transport_registration_func(int _fd, unsigned ev, void*) {
778     tmsg m;
779     atransport* t;
780 
781     if (!(ev & FDE_READ)) {
782         return;
783     }
784 
785     if (transport_read_action(_fd, &m)) {
786         PLOG(FATAL) << "cannot read transport registration socket";
787     }
788 
789     t = m.transport;
790 
791     if (m.action == 0) {
792         D("transport: %s deleting", t->serial.c_str());
793 
794         {
795             std::lock_guard<std::recursive_mutex> lock(transport_lock);
796             transport_list.remove(t);
797         }
798 
799         delete t;
800 
801         update_transports();
802         return;
803     }
804 
805     /* don't create transport threads for inaccessible devices */
806     if (t->GetConnectionState() != kCsNoPerm) {
807         t->connection()->SetTransport(t);
808 
809         if (t->type == kTransportUsb && usb_devices_start_detached()) {
810             t->SetConnectionState(kCsDetached);
811         } else {
812             t->connection()->Start();
813 #if ADB_HOST
814             send_connect(t);
815 #endif
816         }
817     }
818 
819     {
820         std::lock_guard<std::recursive_mutex> lock(transport_lock);
821         auto it = std::find(pending_list.begin(), pending_list.end(), t);
822         if (it != pending_list.end()) {
823             pending_list.remove(t);
824             transport_list.push_front(t);
825         }
826     }
827 
828     update_transports();
829 }
830 
831 #if ADB_HOST
init_reconnect_handler(void)832 void init_reconnect_handler(void) {
833     reconnect_handler.Start();
834 }
835 #endif
836 
init_transport_registration(void)837 void init_transport_registration(void) {
838     int s[2];
839 
840     if (adb_socketpair(s)) {
841         PLOG(FATAL) << "cannot open transport registration socketpair";
842     }
843     D("socketpair: (%d,%d)", s[0], s[1]);
844 
845     transport_registration_send = s[0];
846     transport_registration_recv = s[1];
847 
848     transport_registration_fde =
849             fdevent_create(transport_registration_recv, transport_registration_func, nullptr);
850     fdevent_set(transport_registration_fde, FDE_READ);
851 }
852 
kick_all_transports()853 void kick_all_transports() {
854 #if ADB_HOST
855     reconnect_handler.Stop();
856 #endif
857     // To avoid only writing part of a packet to a transport after exit, kick all transports.
858     std::lock_guard<std::recursive_mutex> lock(transport_lock);
859     for (auto t : transport_list) {
860         t->Kick();
861     }
862 }
863 
kick_all_tcp_tls_transports()864 void kick_all_tcp_tls_transports() {
865     std::lock_guard<std::recursive_mutex> lock(transport_lock);
866     for (auto t : transport_list) {
867         if (t->IsTcpDevice() && t->use_tls) {
868             t->Kick();
869         }
870     }
871 }
872 
873 #if !ADB_HOST
kick_all_transports_by_auth_key(std::string_view auth_key)874 void kick_all_transports_by_auth_key(std::string_view auth_key) {
875     std::lock_guard<std::recursive_mutex> lock(transport_lock);
876     for (auto t : transport_list) {
877         if (auth_key == t->auth_key) {
878             t->Kick();
879         }
880     }
881 }
882 #endif
883 
884 /* the fdevent select pump is single threaded */
register_transport(atransport * transport)885 void register_transport(atransport* transport) {
886     tmsg m;
887     m.transport = transport;
888     m.action = 1;
889     D("transport: %s registered", transport->serial.c_str());
890     if (transport_write_action(transport_registration_send, &m)) {
891         PLOG(FATAL) << "cannot write transport registration socket";
892     }
893 }
894 
remove_transport(atransport * transport)895 static void remove_transport(atransport* transport) {
896     tmsg m;
897     m.transport = transport;
898     m.action = 0;
899     D("transport: %s removed", transport->serial.c_str());
900     if (transport_write_action(transport_registration_send, &m)) {
901         PLOG(FATAL) << "cannot write transport registration socket";
902     }
903 }
904 
transport_destroy(atransport * t)905 static void transport_destroy(atransport* t) {
906     check_main_thread();
907     CHECK(t != nullptr);
908 
909     std::lock_guard<std::recursive_mutex> lock(transport_lock);
910     LOG(INFO) << "destroying transport " << t->serial_name();
911     t->connection()->Stop();
912 #if ADB_HOST
913     if (t->IsTcpDevice() && !t->kicked()) {
914         D("transport: %s destroy (attempting reconnection)", t->serial.c_str());
915 
916         // We need to clear the transport's keys, so that on the next connection, it tries
917         // again from the beginning.
918         t->ResetKeys();
919         reconnect_handler.TrackTransport(t);
920         return;
921     }
922 #endif
923 
924     D("transport: %s destroy (kicking and closing)", t->serial.c_str());
925     remove_transport(t);
926 }
927 
928 #if ADB_HOST
qual_match(const std::string & to_test,const char * prefix,const std::string & qual,bool sanitize_qual)929 static int qual_match(const std::string& to_test, const char* prefix, const std::string& qual,
930                       bool sanitize_qual) {
931     if (to_test.empty()) /* Return true if both the qual and to_test are empty strings. */
932         return qual.empty();
933 
934     if (qual.empty()) return 0;
935 
936     const char* ptr = to_test.c_str();
937     if (prefix) {
938         while (*prefix) {
939             if (*prefix++ != *ptr++) return 0;
940         }
941     }
942 
943     for (char ch : qual) {
944         if (sanitize_qual && !isalnum(ch)) ch = '_';
945         if (ch != *ptr++) return 0;
946     }
947 
948     /* Everything matched so far.  Return true if *ptr is a NUL. */
949     return !*ptr;
950 }
951 
952 // Contains either a device serial string or a USB device address like "usb:2-6"
953 const char* __transport_server_one_device = nullptr;
954 
transport_set_one_device(const char * adb_one_device)955 void transport_set_one_device(const char* adb_one_device) {
956     __transport_server_one_device = adb_one_device;
957 }
958 
transport_get_one_device()959 const char* transport_get_one_device() {
960     return __transport_server_one_device;
961 }
962 
transport_server_owns_device(std::string_view serial)963 bool transport_server_owns_device(std::string_view serial) {
964     if (!__transport_server_one_device) {
965         // If the server doesn't own one device, server owns all devices.
966         return true;
967     }
968     return serial.compare(__transport_server_one_device) == 0;
969 }
970 
transport_server_owns_device(std::string_view dev_path,std::string_view serial)971 bool transport_server_owns_device(std::string_view dev_path, std::string_view serial) {
972     if (!__transport_server_one_device) {
973         // If the server doesn't own one device, server owns all devices.
974         return true;
975     }
976     return serial.compare(__transport_server_one_device) == 0 ||
977            dev_path.compare(__transport_server_one_device) == 0;
978 }
979 
acquire_one_transport(TransportType type,const char * serial,TransportId transport_id,bool * is_ambiguous,std::string * error_out,bool accept_any_state)980 atransport* acquire_one_transport(TransportType type, const char* serial, TransportId transport_id,
981                                   bool* is_ambiguous, std::string* error_out,
982                                   bool accept_any_state) {
983     atransport* result = nullptr;
984 
985     if (transport_id != 0) {
986         *error_out = android::base::StringPrintf("no device with transport id '%" PRIu64 "'",
987                                                  transport_id);
988     } else if (serial) {
989         *error_out = android::base::StringPrintf("device '%s' not found", serial);
990     } else if (type == kTransportLocal) {
991         *error_out = "no emulators found";
992     } else if (type == kTransportAny) {
993         *error_out = "no devices/emulators found";
994     } else {
995         *error_out = "no devices found";
996     }
997 
998     std::unique_lock<std::recursive_mutex> lock(transport_lock);
999     for (const auto& t : transport_list) {
1000         if (t->GetConnectionState() == kCsNoPerm) {
1001             *error_out = UsbNoPermissionsLongHelpText();
1002             continue;
1003         }
1004 
1005         if (transport_id) {
1006             if (t->id == transport_id) {
1007                 result = t;
1008                 break;
1009             }
1010         } else if (serial) {
1011             if (t->MatchesTarget(serial)) {
1012                 if (result) {
1013                     *error_out = "more than one device";
1014                     if (is_ambiguous) *is_ambiguous = true;
1015                     result = nullptr;
1016                     break;
1017                 }
1018                 result = t;
1019             }
1020         } else {
1021             if (type == kTransportUsb && t->type == kTransportUsb) {
1022                 if (result) {
1023                     *error_out = "more than one device";
1024                     if (is_ambiguous) *is_ambiguous = true;
1025                     result = nullptr;
1026                     break;
1027                 }
1028                 result = t;
1029             } else if (type == kTransportLocal && t->type == kTransportLocal) {
1030                 if (result) {
1031                     *error_out = "more than one emulator";
1032                     if (is_ambiguous) *is_ambiguous = true;
1033                     result = nullptr;
1034                     break;
1035                 }
1036                 result = t;
1037             } else if (type == kTransportAny) {
1038                 if (result) {
1039                     *error_out = "more than one device/emulator";
1040                     if (is_ambiguous) *is_ambiguous = true;
1041                     result = nullptr;
1042                     break;
1043                 }
1044                 result = t;
1045             }
1046         }
1047     }
1048     lock.unlock();
1049 
1050     if (result && !accept_any_state) {
1051         // The caller requires an active transport.
1052         // Make sure that we're actually connected.
1053         ConnectionState state = result->GetConnectionState();
1054         switch (state) {
1055             case kCsConnecting:
1056                 *error_out = "device still connecting";
1057                 result = nullptr;
1058                 break;
1059 
1060             case kCsAuthorizing:
1061                 *error_out = "device still authorizing";
1062                 result = nullptr;
1063                 break;
1064 
1065             case kCsUnauthorized: {
1066                 *error_out = "device unauthorized.\n";
1067                 char* ADB_VENDOR_KEYS = getenv("ADB_VENDOR_KEYS");
1068                 *error_out += "This adb server's $ADB_VENDOR_KEYS is ";
1069                 *error_out += ADB_VENDOR_KEYS ? ADB_VENDOR_KEYS : "not set";
1070                 *error_out += "\n";
1071                 *error_out += "Try 'adb kill-server' if that seems wrong.\n";
1072                 *error_out += "Otherwise check for a confirmation dialog on your device.";
1073                 result = nullptr;
1074                 break;
1075             }
1076 
1077             case kCsOffline:
1078                 *error_out = "device offline";
1079                 result = nullptr;
1080                 break;
1081 
1082             default:
1083                 break;
1084         }
1085     }
1086 
1087     if (result) {
1088         *error_out = "success";
1089     }
1090 
1091     return result;
1092 }
1093 
WaitForConnection(std::chrono::milliseconds timeout)1094 bool ConnectionWaitable::WaitForConnection(std::chrono::milliseconds timeout) {
1095     std::unique_lock<std::mutex> lock(mutex_);
1096     ScopedLockAssertion assume_locked(mutex_);
1097     return cv_.wait_for(lock, timeout, [&]() REQUIRES(mutex_) {
1098         return connection_established_ready_;
1099     }) && connection_established_;
1100 }
1101 
SetConnectionEstablished(bool success)1102 void ConnectionWaitable::SetConnectionEstablished(bool success) {
1103     {
1104         std::lock_guard<std::mutex> lock(mutex_);
1105         if (connection_established_ready_) return;
1106         connection_established_ready_ = true;
1107         connection_established_ = success;
1108         D("connection established with %d", success);
1109     }
1110     cv_.notify_one();
1111 }
1112 #endif
1113 
~atransport()1114 atransport::~atransport() {
1115 #if ADB_HOST
1116     // If the connection callback had not been run before, run it now.
1117     SetConnectionEstablished(false);
1118 #endif
1119 }
1120 
Write(apacket * p)1121 int atransport::Write(apacket* p) {
1122     return this->connection()->Write(std::unique_ptr<apacket>(p)) ? 0 : -1;
1123 }
1124 
Reset()1125 void atransport::Reset() {
1126     if (!kicked_.exchange(true)) {
1127         LOG(INFO) << "resetting transport " << this << " " << this->serial;
1128         this->connection()->Reset();
1129     }
1130 }
1131 
Kick()1132 void atransport::Kick() {
1133     if (!kicked_.exchange(true)) {
1134         LOG(INFO) << "kicking transport " << this << " " << this->serial;
1135         this->connection()->Stop();
1136     }
1137 }
1138 
GetConnectionState() const1139 ConnectionState atransport::GetConnectionState() const {
1140     return connection_state_;
1141 }
1142 
SetConnectionState(ConnectionState state)1143 void atransport::SetConnectionState(ConnectionState state) {
1144     check_main_thread();
1145     connection_state_ = state;
1146     update_transports();
1147 }
1148 
1149 #if ADB_HOST
Attach(std::string * error)1150 bool atransport::Attach(std::string* error) {
1151     D("%s: attach", serial.c_str());
1152     check_main_thread();
1153 
1154     if (!should_use_libusb()) {
1155         *error = "attach/detach only implemented for libusb backend";
1156         return false;
1157     }
1158 
1159     if (GetConnectionState() != ConnectionState::kCsDetached) {
1160         *error = android::base::StringPrintf("transport %s is not detached", serial.c_str());
1161         return false;
1162     }
1163 
1164     ResetKeys();
1165 
1166     {
1167         std::lock_guard<std::mutex> lock(mutex_);
1168         if (!connection_->Attach(error)) {
1169             return false;
1170         }
1171     }
1172 
1173     send_connect(this);
1174     return true;
1175 }
1176 
Detach(std::string * error)1177 bool atransport::Detach(std::string* error) {
1178     D("%s: detach", serial.c_str());
1179     check_main_thread();
1180 
1181     if (!should_use_libusb()) {
1182         *error = "attach/detach only implemented for libusb backend";
1183         return false;
1184     }
1185 
1186     if (GetConnectionState() == ConnectionState::kCsDetached) {
1187         *error = android::base::StringPrintf("transport %s is already detached", serial.c_str());
1188         return false;
1189     }
1190 
1191     handle_offline(this);
1192 
1193     {
1194         std::lock_guard<std::mutex> lock(mutex_);
1195         if (!connection_->Detach(error)) {
1196             return false;
1197         }
1198     }
1199 
1200     this->SetConnectionState(kCsDetached);
1201     return true;
1202 }
1203 #endif
1204 
SetConnection(std::shared_ptr<Connection> connection)1205 void atransport::SetConnection(std::shared_ptr<Connection> connection) {
1206     std::lock_guard<std::mutex> lock(mutex_);
1207     connection_ = std::shared_ptr<Connection>(std::move(connection));
1208 }
1209 
HandleRead(std::unique_ptr<apacket> p)1210 bool atransport::HandleRead(std::unique_ptr<apacket> p) {
1211     if (!check_header(p.get(), this)) {
1212         D("%s: remote read: bad header", serial.c_str());
1213         return false;
1214     }
1215 
1216     VLOG(TRANSPORT) << dump_packet(serial.c_str(), "from remote", p.get());
1217     apacket* packet = p.release();
1218 
1219     // TODO: Does this need to run on the main thread?
1220     fdevent_run_on_main_thread([packet, this]() { handle_packet(packet, this); });
1221     return true;
1222 }
1223 
HandleError(const std::string & error)1224 void atransport::HandleError(const std::string& error) {
1225     LOG(INFO) << serial_name() << ": connection terminated: " << error;
1226     fdevent_run_on_main_thread([this]() {
1227         handle_offline(this);
1228         transport_destroy(this);
1229     });
1230 }
1231 
update_version(int version,size_t payload)1232 void atransport::update_version(int version, size_t payload) {
1233     protocol_version = std::min(version, A_VERSION);
1234     max_payload = std::min(payload, MAX_PAYLOAD);
1235 }
1236 
get_protocol_version() const1237 int atransport::get_protocol_version() const {
1238     return protocol_version;
1239 }
1240 
get_tls_version() const1241 int atransport::get_tls_version() const {
1242     return tls_version;
1243 }
1244 
get_max_payload() const1245 size_t atransport::get_max_payload() const {
1246     return max_payload;
1247 }
1248 
supported_features()1249 const FeatureSet& supported_features() {
1250     static const android::base::NoDestructor<FeatureSet> features([] {
1251         return FeatureSet{
1252                 kFeatureShell2,
1253                 kFeatureCmd,
1254                 kFeatureStat2,
1255                 kFeatureLs2,
1256                 kFeatureFixedPushMkdir,
1257                 kFeatureApex,
1258                 kFeatureAbb,
1259                 kFeatureFixedPushSymlinkTimestamp,
1260                 kFeatureAbbExec,
1261                 kFeatureRemountShell,
1262                 kFeatureTrackApp,
1263                 kFeatureSendRecv2,
1264                 kFeatureSendRecv2Brotli,
1265                 kFeatureSendRecv2LZ4,
1266                 kFeatureSendRecv2Zstd,
1267                 kFeatureSendRecv2DryRunSend,
1268                 kFeatureOpenscreenMdns,
1269                 // Increment ADB_SERVER_VERSION when adding a feature that adbd needs
1270                 // to know about. Otherwise, the client can be stuck running an old
1271                 // version of the server even after upgrading their copy of adb.
1272                 // (http://b/24370690)
1273         };
1274     }());
1275 
1276     return *features;
1277 }
1278 
FeatureSetToString(const FeatureSet & features)1279 std::string FeatureSetToString(const FeatureSet& features) {
1280     return android::base::Join(features, ',');
1281 }
1282 
StringToFeatureSet(const std::string & features_string)1283 FeatureSet StringToFeatureSet(const std::string& features_string) {
1284     if (features_string.empty()) {
1285         return FeatureSet();
1286     }
1287 
1288     return android::base::Split(features_string, ",");
1289 }
1290 
1291 template <class Range, class Value>
contains(const Range & r,const Value & v)1292 static bool contains(const Range& r, const Value& v) {
1293     return std::find(std::begin(r), std::end(r), v) != std::end(r);
1294 }
1295 
CanUseFeature(const FeatureSet & feature_set,const std::string & feature)1296 bool CanUseFeature(const FeatureSet& feature_set, const std::string& feature) {
1297     return contains(feature_set, feature) && contains(supported_features(), feature);
1298 }
1299 
has_feature(const std::string & feature) const1300 bool atransport::has_feature(const std::string& feature) const {
1301     return contains(features_, feature);
1302 }
1303 
SetFeatures(const std::string & features_string)1304 void atransport::SetFeatures(const std::string& features_string) {
1305     features_ = StringToFeatureSet(features_string);
1306 }
1307 
AddDisconnect(adisconnect * disconnect)1308 void atransport::AddDisconnect(adisconnect* disconnect) {
1309     disconnects_.push_back(disconnect);
1310 }
1311 
RemoveDisconnect(adisconnect * disconnect)1312 void atransport::RemoveDisconnect(adisconnect* disconnect) {
1313     disconnects_.remove(disconnect);
1314 }
1315 
RunDisconnects()1316 void atransport::RunDisconnects() {
1317     for (const auto& disconnect : disconnects_) {
1318         disconnect->func(disconnect->opaque, this);
1319     }
1320     disconnects_.clear();
1321 }
1322 
1323 #if ADB_HOST
MatchesTarget(const std::string & target) const1324 bool atransport::MatchesTarget(const std::string& target) const {
1325     if (!serial.empty()) {
1326         if (target == serial) {
1327             return true;
1328         } else if (type == kTransportLocal) {
1329             // Local transports can match [tcp:|udp:]<hostname>[:port].
1330             const char* local_target_ptr = target.c_str();
1331 
1332             // For fastboot compatibility, ignore protocol prefixes.
1333             if (android::base::StartsWith(target, "tcp:") ||
1334                 android::base::StartsWith(target, "udp:")) {
1335                 local_target_ptr += 4;
1336             }
1337 
1338             // Parse our |serial| and the given |target| to check if the hostnames and ports match.
1339             std::string serial_host, error;
1340             int serial_port = -1;
1341             if (android::base::ParseNetAddress(serial, &serial_host, &serial_port, nullptr,
1342                                                &error)) {
1343                 // |target| may omit the port to default to ours.
1344                 std::string target_host;
1345                 int target_port = serial_port;
1346                 if (android::base::ParseNetAddress(local_target_ptr, &target_host, &target_port,
1347                                                    nullptr, &error) &&
1348                     serial_host == target_host && serial_port == target_port) {
1349                     return true;
1350                 }
1351             }
1352         }
1353     }
1354 
1355     return (target == devpath) || qual_match(target, "product:", product, false) ||
1356            qual_match(target, "model:", model, true) ||
1357            qual_match(target, "device:", device, false);
1358 }
1359 
SetConnectionEstablished(bool success)1360 void atransport::SetConnectionEstablished(bool success) {
1361     connection_waitable_->SetConnectionEstablished(success);
1362 }
1363 
Reconnect()1364 ReconnectResult atransport::Reconnect() {
1365     return reconnect_(this);
1366 }
1367 
1368 // We use newline as our delimiter, make sure to never output it.
sanitize(std::string str,bool alphanumeric)1369 static std::string sanitize(std::string str, bool alphanumeric) {
1370     auto pred = alphanumeric ? [](const char c) { return !isalnum(c); }
1371                              : [](const char c) { return c == '\n'; };
1372     std::replace_if(str.begin(), str.end(), pred, '_');
1373     return str;
1374 }
1375 
append_transport_info(std::string * result,const char * key,const std::string & value,bool alphanumeric)1376 static void append_transport_info(std::string* result, const char* key, const std::string& value,
1377                                   bool alphanumeric) {
1378     if (value.empty()) {
1379         return;
1380     }
1381 
1382     *result += ' ';
1383     *result += key;
1384     *result += sanitize(value, alphanumeric);
1385 }
1386 
append_transport(const atransport * t,std::string * result,bool long_listing)1387 static void append_transport(const atransport* t, std::string* result, bool long_listing) {
1388     std::string serial = t->serial;
1389     if (serial.empty()) {
1390         serial = "(no serial number)";
1391     }
1392 
1393     if (!long_listing) {
1394         *result += serial;
1395         *result += '\t';
1396         *result += to_string(t->GetConnectionState());
1397     } else {
1398         android::base::StringAppendF(result, "%-22s %s", serial.c_str(),
1399                                      to_string(t->GetConnectionState()).c_str());
1400 
1401         append_transport_info(result, "", t->devpath, false);
1402         append_transport_info(result, "product:", t->product, false);
1403         append_transport_info(result, "model:", t->model, true);
1404         append_transport_info(result, "device:", t->device, false);
1405 
1406         // Put id at the end, so that anyone parsing the output here can always find it by scanning
1407         // backwards from newlines, even with hypothetical devices named 'transport_id:1'.
1408         *result += " transport_id:";
1409         *result += std::to_string(t->id);
1410     }
1411     *result += '\n';
1412 }
1413 
list_transports(bool long_listing)1414 std::string list_transports(bool long_listing) {
1415     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1416 
1417     auto sorted_transport_list = transport_list;
1418     sorted_transport_list.sort([](atransport*& x, atransport*& y) {
1419         if (x->type != y->type) {
1420             return x->type < y->type;
1421         }
1422         return x->serial < y->serial;
1423     });
1424 
1425     std::string result;
1426     for (const auto& t : sorted_transport_list) {
1427         append_transport(t, &result, long_listing);
1428     }
1429     return result;
1430 }
1431 
close_usb_devices(std::function<bool (const atransport *)> predicate,bool reset)1432 void close_usb_devices(std::function<bool(const atransport*)> predicate, bool reset) {
1433     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1434     for (auto& t : transport_list) {
1435         if (predicate(t)) {
1436             if (reset) {
1437                 t->Reset();
1438             } else {
1439                 t->Kick();
1440             }
1441         }
1442     }
1443 }
1444 
1445 /* hack for osx */
close_usb_devices(bool reset)1446 void close_usb_devices(bool reset) {
1447     close_usb_devices([](const atransport*) { return true; }, reset);
1448 }
1449 #endif
1450 
register_socket_transport(unique_fd s,std::string serial,int port,int local,atransport::ReconnectCallback reconnect,bool use_tls,int * error)1451 bool register_socket_transport(unique_fd s, std::string serial, int port, int local,
1452                                atransport::ReconnectCallback reconnect, bool use_tls, int* error) {
1453     atransport* t = new atransport(std::move(reconnect), kCsOffline);
1454     t->use_tls = use_tls;
1455 
1456     D("transport: %s init'ing for socket %d, on port %d", serial.c_str(), s.get(), port);
1457     if (init_socket_transport(t, std::move(s), port, local) < 0) {
1458         delete t;
1459         if (error) *error = errno;
1460         return false;
1461     }
1462 
1463     std::unique_lock<std::recursive_mutex> lock(transport_lock);
1464     for (const auto& transport : pending_list) {
1465         if (serial == transport->serial) {
1466             VLOG(TRANSPORT) << "socket transport " << transport->serial
1467                             << " is already in pending_list and fails to register";
1468             delete t;
1469             if (error) *error = EALREADY;
1470             return false;
1471         }
1472     }
1473 
1474     for (const auto& transport : transport_list) {
1475         if (serial == transport->serial) {
1476             VLOG(TRANSPORT) << "socket transport " << transport->serial
1477                             << " is already in transport_list and fails to register";
1478             delete t;
1479             if (error) *error = EALREADY;
1480             return false;
1481         }
1482     }
1483 
1484     t->serial = std::move(serial);
1485     pending_list.push_front(t);
1486 
1487     lock.unlock();
1488 
1489 #if ADB_HOST
1490     auto waitable = t->connection_waitable();
1491 #endif
1492     register_transport(t);
1493 
1494     if (local == 1) {
1495         // Do not wait for emulator transports.
1496         return true;
1497     }
1498 
1499 #if ADB_HOST
1500     if (!waitable->WaitForConnection(std::chrono::seconds(10))) {
1501         if (error) *error = ETIMEDOUT;
1502         return false;
1503     }
1504 
1505     if (t->GetConnectionState() == kCsUnauthorized) {
1506         if (error) *error = EPERM;
1507         return false;
1508     }
1509 #endif
1510 
1511     return true;
1512 }
1513 
1514 #if ADB_HOST
find_transport(const char * serial)1515 atransport* find_transport(const char* serial) {
1516     atransport* result = nullptr;
1517 
1518     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1519     for (auto& t : transport_list) {
1520         if (strcmp(serial, t->serial.c_str()) == 0) {
1521             result = t;
1522             break;
1523         }
1524     }
1525 
1526     return result;
1527 }
1528 
kick_all_tcp_devices()1529 void kick_all_tcp_devices() {
1530     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1531     for (auto& t : transport_list) {
1532         if (t->IsTcpDevice()) {
1533             // Kicking breaks the read_transport thread of this transport out of any read, then
1534             // the read_transport thread will notify the main thread to make this transport
1535             // offline. Then the main thread will notify the write_transport thread to exit.
1536             // Finally, this transport will be closed and freed in the main thread.
1537             t->Kick();
1538         }
1539     }
1540     reconnect_handler.CheckForKicked();
1541 }
1542 
register_usb_transport(std::shared_ptr<Connection> connection,const char * serial,const char * devpath,unsigned writeable)1543 void register_usb_transport(std::shared_ptr<Connection> connection, const char* serial,
1544                             const char* devpath, unsigned writeable) {
1545     atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm);
1546     if (serial) {
1547         t->serial = serial;
1548     }
1549     if (devpath) {
1550         t->devpath = devpath;
1551     }
1552 
1553     t->SetConnection(std::move(connection));
1554     t->type = kTransportUsb;
1555 
1556     {
1557         std::lock_guard<std::recursive_mutex> lock(transport_lock);
1558         pending_list.push_front(t);
1559     }
1560 
1561     register_transport(t);
1562 }
1563 
register_usb_transport(usb_handle * usb,const char * serial,const char * devpath,unsigned writeable)1564 void register_usb_transport(usb_handle* usb, const char* serial, const char* devpath,
1565                             unsigned writeable) {
1566     atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm);
1567 
1568     D("transport: %p init'ing for usb_handle %p (sn='%s')", t, usb, serial ? serial : "");
1569     init_usb_transport(t, usb);
1570     if (serial) {
1571         t->serial = serial;
1572     }
1573 
1574     if (devpath) {
1575         t->devpath = devpath;
1576     }
1577 
1578     {
1579         std::lock_guard<std::recursive_mutex> lock(transport_lock);
1580         pending_list.push_front(t);
1581     }
1582 
1583     register_transport(t);
1584 }
1585 
1586 // This should only be used for transports with connection_state == kCsNoPerm.
unregister_usb_transport(usb_handle * usb)1587 void unregister_usb_transport(usb_handle* usb) {
1588     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1589     transport_list.remove_if([usb](atransport* t) {
1590         return t->GetUsbHandle() == usb && t->GetConnectionState() == kCsNoPerm;
1591     });
1592 }
1593 #endif
1594 
check_header(apacket * p,atransport * t)1595 bool check_header(apacket* p, atransport* t) {
1596     if (p->msg.magic != (p->msg.command ^ 0xffffffff)) {
1597         VLOG(RWX) << "check_header(): invalid magic command = " << std::hex << p->msg.command
1598                   << ", magic = " << p->msg.magic;
1599         return false;
1600     }
1601 
1602     if (p->msg.data_length > t->get_max_payload()) {
1603         VLOG(RWX) << "check_header(): " << p->msg.data_length
1604                   << " atransport::max_payload = " << t->get_max_payload();
1605         return false;
1606     }
1607 
1608     return true;
1609 }
1610 
1611 #if ADB_HOST
Key()1612 std::shared_ptr<RSA> atransport::Key() {
1613     if (keys_.empty()) {
1614         return nullptr;
1615     }
1616 
1617     std::shared_ptr<RSA> result = keys_[0];
1618     return result;
1619 }
1620 
NextKey()1621 std::shared_ptr<RSA> atransport::NextKey() {
1622     if (keys_.empty()) {
1623         LOG(INFO) << "fetching keys for transport " << this->serial_name();
1624         keys_ = adb_auth_get_private_keys();
1625 
1626         // We should have gotten at least one key: the one that's automatically generated.
1627         CHECK(!keys_.empty());
1628     } else {
1629         keys_.pop_front();
1630     }
1631 
1632     return Key();
1633 }
1634 
ResetKeys()1635 void atransport::ResetKeys() {
1636     keys_.clear();
1637 }
1638 #endif
1639