• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2007 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define TRACE_TAG TRANSPORT
18 
19 #include "sysdeps.h"
20 
21 #include "transport.h"
22 
23 #include <ctype.h>
24 #include <errno.h>
25 #include <inttypes.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <unistd.h>
30 
31 #include <algorithm>
32 #include <list>
33 #include <memory>
34 #include <mutex>
35 #include <set>
36 #include <string>
37 #include <thread>
38 
39 #include <adb/crypto/rsa_2048_key.h>
40 #include <adb/crypto/x509_generator.h>
41 #include <adb/tls/tls_connection.h>
42 #include <android-base/logging.h>
43 #include <android-base/no_destructor.h>
44 #include <android-base/parsenetaddress.h>
45 #include <android-base/stringprintf.h>
46 #include <android-base/strings.h>
47 #include <android-base/thread_annotations.h>
48 #include <diagnose_usb.h>
49 
50 #include "adb.h"
51 #include "adb_auth.h"
52 #include "adb_io.h"
53 #include "adb_trace.h"
54 #include "adb_utils.h"
55 #include "fdevent/fdevent.h"
56 #include "sysdeps/chrono.h"
57 
58 #if ADB_HOST
59 #include "client/usb.h"
60 #endif
61 
62 using namespace adb::crypto;
63 using namespace adb::tls;
64 using namespace std::string_literals;
65 using android::base::ScopedLockAssertion;
66 using TlsError = TlsConnection::TlsError;
67 
68 static void remove_transport(atransport* transport);
69 static void transport_destroy(atransport* transport);
70 
71 // TODO: unordered_map<TransportId, atransport*>
72 static auto& transport_list = *new std::list<atransport*>();
73 static auto& pending_list = *new std::list<atransport*>();
74 
75 static auto& transport_lock = *new std::recursive_mutex();
76 
77 const char* const kFeatureShell2 = "shell_v2";
78 const char* const kFeatureCmd = "cmd";
79 const char* const kFeatureStat2 = "stat_v2";
80 const char* const kFeatureLs2 = "ls_v2";
81 const char* const kFeatureLibusb = "libusb";
82 const char* const kFeaturePushSync = "push_sync";
83 const char* const kFeatureApex = "apex";
84 const char* const kFeatureFixedPushMkdir = "fixed_push_mkdir";
85 const char* const kFeatureAbb = "abb";
86 const char* const kFeatureFixedPushSymlinkTimestamp = "fixed_push_symlink_timestamp";
87 const char* const kFeatureAbbExec = "abb_exec";
88 const char* const kFeatureRemountShell = "remount_shell";
89 const char* const kFeatureTrackApp = "track_app";
90 const char* const kFeatureSendRecv2 = "sendrecv_v2";
91 const char* const kFeatureSendRecv2Brotli = "sendrecv_v2_brotli";
92 const char* const kFeatureSendRecv2LZ4 = "sendrecv_v2_lz4";
93 const char* const kFeatureSendRecv2Zstd = "sendrecv_v2_zstd";
94 const char* const kFeatureSendRecv2DryRunSend = "sendrecv_v2_dry_run_send";
95 const char* const kFeatureDelayedAck = "delayed_ack";
96 // TODO(joshuaduong): Bump to v2 when openscreen discovery is enabled by default
97 const char* const kFeatureOpenscreenMdns = "openscreen_mdns";
98 
99 namespace {
100 
101 #if ADB_HOST
102 
103 // Tracks and handles atransport*s that are attempting reconnection.
104 class ReconnectHandler {
105   public:
106     ReconnectHandler() = default;
107     ~ReconnectHandler() = default;
108 
109     // Starts the ReconnectHandler thread.
110     void Start();
111 
112     // Requests the ReconnectHandler thread to stop.
113     void Stop();
114 
115     // Adds the atransport* to the queue of reconnect attempts.
116     void TrackTransport(atransport* transport);
117 
118     // Wake up the ReconnectHandler thread to have it check for kicked transports.
119     void CheckForKicked();
120 
121   private:
122     // The main thread loop.
123     void Run();
124 
125     // Tracks a reconnection attempt.
126     struct ReconnectAttempt {
127         atransport* transport;
128         std::chrono::steady_clock::time_point reconnect_time;
129         size_t attempts_left;
130 
operator <__anonc80066ee0111::ReconnectHandler::ReconnectAttempt131         bool operator<(const ReconnectAttempt& rhs) const {
132             if (reconnect_time == rhs.reconnect_time) {
133                 return reinterpret_cast<uintptr_t>(transport) <
134                        reinterpret_cast<uintptr_t>(rhs.transport);
135             }
136             return reconnect_time < rhs.reconnect_time;
137         }
138     };
139 
140     // Only retry for up to one minute.
141     static constexpr const std::chrono::seconds kDefaultTimeout = 3s;
142     static constexpr const size_t kMaxAttempts = 20;
143 
144     // Protects all members.
145     std::mutex reconnect_mutex_;
146     bool running_ GUARDED_BY(reconnect_mutex_) = true;
147     std::thread handler_thread_;
148     std::condition_variable reconnect_cv_;
149     std::set<ReconnectAttempt> reconnect_queue_ GUARDED_BY(reconnect_mutex_);
150 
151     DISALLOW_COPY_AND_ASSIGN(ReconnectHandler);
152 };
153 
Start()154 void ReconnectHandler::Start() {
155     fdevent_check_looper();
156     handler_thread_ = std::thread(&ReconnectHandler::Run, this);
157 }
158 
Stop()159 void ReconnectHandler::Stop() {
160     fdevent_check_looper();
161     {
162         std::lock_guard<std::mutex> lock(reconnect_mutex_);
163         running_ = false;
164     }
165     reconnect_cv_.notify_one();
166     handler_thread_.join();
167 
168     // Drain the queue to free all resources.
169     std::lock_guard<std::mutex> lock(reconnect_mutex_);
170     while (!reconnect_queue_.empty()) {
171         ReconnectAttempt attempt = *reconnect_queue_.begin();
172         reconnect_queue_.erase(reconnect_queue_.begin());
173         remove_transport(attempt.transport);
174     }
175 }
176 
TrackTransport(atransport * transport)177 void ReconnectHandler::TrackTransport(atransport* transport) {
178     fdevent_check_looper();
179     {
180         std::lock_guard<std::mutex> lock(reconnect_mutex_);
181         if (!running_) return;
182         // Arbitrary sleep to give adbd time to get ready, if we disconnected because it exited.
183         auto reconnect_time = std::chrono::steady_clock::now() + 250ms;
184         reconnect_queue_.emplace(
185                 ReconnectAttempt{transport, reconnect_time, ReconnectHandler::kMaxAttempts});
186     }
187     reconnect_cv_.notify_one();
188 }
189 
CheckForKicked()190 void ReconnectHandler::CheckForKicked() {
191     reconnect_cv_.notify_one();
192 }
193 
Run()194 void ReconnectHandler::Run() {
195     while (true) {
196         ReconnectAttempt attempt;
197         {
198             std::unique_lock<std::mutex> lock(reconnect_mutex_);
199             ScopedLockAssertion assume_lock(reconnect_mutex_);
200 
201             if (!reconnect_queue_.empty()) {
202                 // FIXME: libstdc++ (used on Windows) implements condition_variable with
203                 //        system_clock as its clock, so we're probably hosed if the clock changes,
204                 //        even if we use steady_clock throughout. This problem goes away once we
205                 //        switch to libc++.
206                 reconnect_cv_.wait_until(lock, reconnect_queue_.begin()->reconnect_time);
207             } else {
208                 reconnect_cv_.wait(lock);
209             }
210 
211             if (!running_) return;
212 
213             // Scan the whole list for kicked transports, so that we immediately handle an explicit
214             // disconnect request.
215             for (auto it = reconnect_queue_.begin(); it != reconnect_queue_.end();) {
216                 if (it->transport->kicked()) {
217                     D("transport %s was kicked. giving up on it.", it->transport->serial.c_str());
218                     remove_transport(it->transport);
219                     it = reconnect_queue_.erase(it);
220                 } else {
221                     ++it;
222                 }
223             }
224 
225             if (reconnect_queue_.empty()) continue;
226 
227             // Go back to sleep if we either woke up spuriously, or we were woken up to remove
228             // a kicked transport, and the first transport isn't ready for reconnection yet.
229             auto now = std::chrono::steady_clock::now();
230             if (reconnect_queue_.begin()->reconnect_time > now) {
231                 continue;
232             }
233 
234             attempt = *reconnect_queue_.begin();
235             reconnect_queue_.erase(reconnect_queue_.begin());
236         }
237         D("attempting to reconnect %s", attempt.transport->serial.c_str());
238 
239         switch (attempt.transport->Reconnect()) {
240             case ReconnectResult::Retry: {
241                 D("attempting to reconnect %s failed.", attempt.transport->serial.c_str());
242                 if (attempt.attempts_left == 0) {
243                     D("transport %s exceeded the number of retry attempts. giving up on it.",
244                       attempt.transport->serial.c_str());
245                     remove_transport(attempt.transport);
246                     continue;
247                 }
248 
249                 std::lock_guard<std::mutex> lock(reconnect_mutex_);
250                 reconnect_queue_.emplace(ReconnectAttempt{
251                         attempt.transport,
252                         std::chrono::steady_clock::now() + ReconnectHandler::kDefaultTimeout,
253                         attempt.attempts_left - 1});
254                 continue;
255             }
256 
257             case ReconnectResult::Success:
258                 D("reconnection to %s succeeded.", attempt.transport->serial.c_str());
259                 register_transport(attempt.transport);
260                 continue;
261 
262             case ReconnectResult::Abort:
263                 D("cancelling reconnection attempt to %s.", attempt.transport->serial.c_str());
264                 remove_transport(attempt.transport);
265                 continue;
266         }
267     }
268 }
269 
270 static auto& reconnect_handler = *new ReconnectHandler();
271 
272 #endif
273 
274 }  // namespace
275 
NextTransportId()276 TransportId NextTransportId() {
277     static std::atomic<TransportId> next(1);
278     return next++;
279 }
280 
Reset()281 void Connection::Reset() {
282     LOG(INFO) << "Connection::Reset(): stopping";
283     Stop();
284 }
285 
Serial() const286 std::string Connection::Serial() const {
287     return transport_ ? transport_->serial_name() : "<unknown>";
288 }
289 
BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)290 BlockingConnectionAdapter::BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)
291     : underlying_(std::move(connection)) {}
292 
~BlockingConnectionAdapter()293 BlockingConnectionAdapter::~BlockingConnectionAdapter() {
294     LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): destructing";
295     Stop();
296 }
297 
Start()298 void BlockingConnectionAdapter::Start() {
299     std::lock_guard<std::mutex> lock(mutex_);
300     if (started_) {
301         LOG(FATAL) << "BlockingConnectionAdapter(" << Serial() << "): started multiple times";
302     }
303 
304     StartReadThread();
305 
306     write_thread_ = std::thread([this]() {
307         LOG(INFO) << Serial() << ": write thread spawning";
308         while (true) {
309             std::unique_lock<std::mutex> lock(mutex_);
310             ScopedLockAssertion assume_locked(mutex_);
311             cv_.wait(lock, [this]() REQUIRES(mutex_) {
312                 return this->stopped_ || !this->write_queue_.empty();
313             });
314 
315             if (this->stopped_) {
316                 return;
317             }
318 
319             std::unique_ptr<apacket> packet = std::move(this->write_queue_.front());
320             this->write_queue_.pop_front();
321             lock.unlock();
322 
323             if (!this->underlying_->Write(packet.get())) {
324                 break;
325             }
326         }
327         std::call_once(this->error_flag_, [this]() { transport_->HandleError("write failed"); });
328     });
329 
330     started_ = true;
331 }
332 
StartReadThread()333 void BlockingConnectionAdapter::StartReadThread() {
334     read_thread_ = std::thread([this]() {
335         LOG(INFO) << Serial() << ": read thread spawning";
336         while (true) {
337             auto packet = std::make_unique<apacket>();
338             if (!underlying_->Read(packet.get())) {
339                 PLOG(INFO) << Serial() << ": read failed";
340                 break;
341             }
342 
343             bool got_stls_cmd = false;
344             if (packet->msg.command == A_STLS) {
345                 got_stls_cmd = true;
346             }
347 
348             transport_->HandleRead(std::move(packet));
349 
350             // If we received the STLS packet, we are about to perform the TLS
351             // handshake. So this read thread must stop and resume after the
352             // handshake completes otherwise this will interfere in the process.
353             if (got_stls_cmd) {
354                 LOG(INFO) << Serial() << ": Received STLS packet. Stopping read thread.";
355                 return;
356             }
357         }
358         std::call_once(this->error_flag_, [this]() { transport_->HandleError("read failed"); });
359     });
360 }
361 
DoTlsHandshake(RSA * key,std::string * auth_key)362 bool BlockingConnectionAdapter::DoTlsHandshake(RSA* key, std::string* auth_key) {
363     std::lock_guard<std::mutex> lock(mutex_);
364     if (read_thread_.joinable()) {
365         read_thread_.join();
366     }
367     bool success = this->underlying_->DoTlsHandshake(key, auth_key);
368     StartReadThread();
369     return success;
370 }
371 
Reset()372 void BlockingConnectionAdapter::Reset() {
373     {
374         std::lock_guard<std::mutex> lock(mutex_);
375         if (!started_) {
376             LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): not started";
377             return;
378         }
379 
380         if (stopped_) {
381             LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): already stopped";
382             return;
383         }
384     }
385 
386     LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): resetting";
387     this->underlying_->Reset();
388     Stop();
389 }
390 
Stop()391 void BlockingConnectionAdapter::Stop() {
392     {
393         std::lock_guard<std::mutex> lock(mutex_);
394         if (!started_) {
395             LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): not started";
396             return;
397         }
398 
399         if (stopped_) {
400             LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): already stopped";
401             return;
402         }
403 
404         stopped_ = true;
405     }
406 
407     LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): stopping";
408 
409     this->underlying_->Close();
410     this->cv_.notify_one();
411 
412     // Move the threads out into locals with the lock taken, and then unlock to let them exit.
413     std::thread read_thread;
414     std::thread write_thread;
415 
416     {
417         std::lock_guard<std::mutex> lock(mutex_);
418         read_thread = std::move(read_thread_);
419         write_thread = std::move(write_thread_);
420     }
421 
422     read_thread.join();
423     write_thread.join();
424 
425     LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): stopped";
426     std::call_once(this->error_flag_, [this]() { transport_->HandleError("requested stop"); });
427 }
428 
Write(std::unique_ptr<apacket> packet)429 bool BlockingConnectionAdapter::Write(std::unique_ptr<apacket> packet) {
430     {
431         std::lock_guard<std::mutex> lock(this->mutex_);
432         write_queue_.emplace_back(std::move(packet));
433     }
434 
435     cv_.notify_one();
436     return true;
437 }
438 
FdConnection(unique_fd fd)439 FdConnection::FdConnection(unique_fd fd) : fd_(std::move(fd)) {}
440 
~FdConnection()441 FdConnection::~FdConnection() {}
442 
DispatchRead(void * buf,size_t len)443 bool FdConnection::DispatchRead(void* buf, size_t len) {
444     if (tls_ != nullptr) {
445         // The TlsConnection doesn't allow 0 byte reads
446         if (len == 0) {
447             return true;
448         }
449         return tls_->ReadFully(buf, len);
450     }
451 
452     return ReadFdExactly(fd_.get(), buf, len);
453 }
454 
DispatchWrite(void * buf,size_t len)455 bool FdConnection::DispatchWrite(void* buf, size_t len) {
456     if (tls_ != nullptr) {
457         // The TlsConnection doesn't allow 0 byte writes
458         if (len == 0) {
459             return true;
460         }
461         return tls_->WriteFully(std::string_view(reinterpret_cast<const char*>(buf), len));
462     }
463 
464     return WriteFdExactly(fd_.get(), buf, len);
465 }
466 
Read(apacket * packet)467 bool FdConnection::Read(apacket* packet) {
468     if (!DispatchRead(&packet->msg, sizeof(amessage))) {
469         D("remote local: read terminated (message)");
470         return false;
471     }
472 
473     if (packet->msg.data_length > MAX_PAYLOAD) {
474         D("remote local: read overflow (data length = %" PRIu32 ")", packet->msg.data_length);
475         return false;
476     }
477 
478     packet->payload.resize(packet->msg.data_length);
479 
480     if (!DispatchRead(&packet->payload[0], packet->payload.size())) {
481         D("remote local: terminated (data)");
482         return false;
483     }
484 
485     return true;
486 }
487 
Write(apacket * packet)488 bool FdConnection::Write(apacket* packet) {
489     if (!DispatchWrite(&packet->msg, sizeof(packet->msg))) {
490         D("remote local: write terminated");
491         return false;
492     }
493 
494     if (packet->msg.data_length) {
495         if (!DispatchWrite(&packet->payload[0], packet->msg.data_length)) {
496             D("remote local: write terminated");
497             return false;
498         }
499     }
500 
501     return true;
502 }
503 
DoTlsHandshake(RSA * key,std::string * auth_key)504 bool FdConnection::DoTlsHandshake(RSA* key, std::string* auth_key) {
505     bssl::UniquePtr<EVP_PKEY> evp_pkey(EVP_PKEY_new());
506     if (!EVP_PKEY_set1_RSA(evp_pkey.get(), key)) {
507         LOG(ERROR) << "EVP_PKEY_set1_RSA failed";
508         return false;
509     }
510     auto x509 = GenerateX509Certificate(evp_pkey.get());
511     auto x509_str = X509ToPEMString(x509.get());
512     auto evp_str = Key::ToPEMString(evp_pkey.get());
513 
514     int osh = cast_handle_to_int(adb_get_os_handle(fd_));
515 #if ADB_HOST
516     tls_ = TlsConnection::Create(TlsConnection::Role::Client, x509_str, evp_str, osh);
517 #else
518     tls_ = TlsConnection::Create(TlsConnection::Role::Server, x509_str, evp_str, osh);
519 #endif
520     CHECK(tls_);
521 #if ADB_HOST
522     // TLS 1.3 gives the client no message if the server rejected the
523     // certificate. This will enable a check in the tls connection to check
524     // whether the client certificate got rejected. Note that this assumes
525     // that, on handshake success, the server speaks first.
526     tls_->EnableClientPostHandshakeCheck(true);
527     // Add callback to set the certificate when server issues the
528     // CertificateRequest.
529     tls_->SetCertificateCallback(adb_tls_set_certificate);
530     // Allow any server certificate
531     tls_->SetCertVerifyCallback([](X509_STORE_CTX*) { return 1; });
532 #else
533     // Add callback to check certificate against a list of known public keys
534     tls_->SetCertVerifyCallback(
535             [auth_key](X509_STORE_CTX* ctx) { return adbd_tls_verify_cert(ctx, auth_key); });
536     // Add the list of allowed client CA issuers
537     auto ca_list = adbd_tls_client_ca_list();
538     tls_->SetClientCAList(ca_list.get());
539 #endif
540 
541     auto err = tls_->DoHandshake();
542     if (err == TlsError::Success) {
543         return true;
544     }
545 
546     tls_.reset();
547     return false;
548 }
549 
Close()550 void FdConnection::Close() {
551     adb_shutdown(fd_.get());
552     fd_.reset();
553 }
554 
send_packet(apacket * p,atransport * t)555 void send_packet(apacket* p, atransport* t) {
556     p->msg.magic = p->msg.command ^ 0xffffffff;
557     // compute a checksum for connection/auth packets for compatibility reasons
558     if (t->get_protocol_version() >= A_VERSION_SKIP_CHECKSUM) {
559         p->msg.data_check = 0;
560     } else {
561         p->msg.data_check = calculate_apacket_checksum(p);
562     }
563 
564     VLOG(TRANSPORT) << dump_packet(t->serial.c_str(), "to remote", p);
565 
566     if (t == nullptr) {
567         LOG(FATAL) << "Transport is null";
568     }
569 
570     if (t->Write(p) != 0) {
571         D("%s: failed to enqueue packet, closing transport", t->serial.c_str());
572         t->Kick();
573     }
574 }
575 
kick_transport(atransport * t,bool reset)576 void kick_transport(atransport* t, bool reset) {
577     std::lock_guard<std::recursive_mutex> lock(transport_lock);
578     // As kick_transport() can be called from threads without guarantee that t is valid,
579     // check if the transport is in transport_list first.
580     //
581     // TODO(jmgao): WTF? Is this actually true?
582     if (std::find(transport_list.begin(), transport_list.end(), t) != transport_list.end()) {
583         if (reset) {
584             t->Reset();
585         } else {
586             t->Kick();
587         }
588     }
589 
590 #if ADB_HOST
591     reconnect_handler.CheckForKicked();
592 #endif
593 }
594 
595 static int transport_registration_send = -1;
596 static int transport_registration_recv = -1;
597 static fdevent* transport_registration_fde;
598 
599 #if ADB_HOST
600 
601 /* this adds support required by the 'track-devices' service.
602  * this is used to send the content of "list_transport" to any
603  * number of client connections that want it through a single
604  * live TCP connection
605  */
606 struct device_tracker {
607     asocket socket;
608     bool update_needed = false;
609     bool long_output = false;
610     device_tracker* next = nullptr;
611 };
612 
613 /* linked list of all device trackers */
614 static device_tracker* device_tracker_list;
615 
device_tracker_remove(device_tracker * tracker)616 static void device_tracker_remove(device_tracker* tracker) {
617     device_tracker** pnode = &device_tracker_list;
618     device_tracker* node = *pnode;
619 
620     std::lock_guard<std::recursive_mutex> lock(transport_lock);
621     while (node) {
622         if (node == tracker) {
623             *pnode = node->next;
624             break;
625         }
626         pnode = &node->next;
627         node = *pnode;
628     }
629 }
630 
device_tracker_close(asocket * socket)631 static void device_tracker_close(asocket* socket) {
632     device_tracker* tracker = (device_tracker*)socket;
633     asocket* peer = socket->peer;
634 
635     D("device tracker %p removed", tracker);
636     if (peer) {
637         peer->peer = nullptr;
638         peer->close(peer);
639     }
640     device_tracker_remove(tracker);
641     delete tracker;
642 }
643 
device_tracker_enqueue(asocket * socket,apacket::payload_type)644 static int device_tracker_enqueue(asocket* socket, apacket::payload_type) {
645     /* you can't read from a device tracker, close immediately */
646     device_tracker_close(socket);
647     return -1;
648 }
649 
device_tracker_send(device_tracker * tracker,const std::string & string)650 static int device_tracker_send(device_tracker* tracker, const std::string& string) {
651     asocket* peer = tracker->socket.peer;
652 
653     apacket::payload_type data;
654     data.resize(4 + string.size());
655     char buf[5];
656     snprintf(buf, sizeof(buf), "%04x", static_cast<int>(string.size()));
657     memcpy(&data[0], buf, 4);
658     memcpy(&data[4], string.data(), string.size());
659     return peer->enqueue(peer, std::move(data));
660 }
661 
device_tracker_ready(asocket * socket)662 static void device_tracker_ready(asocket* socket) {
663     device_tracker* tracker = reinterpret_cast<device_tracker*>(socket);
664 
665     // We want to send the device list when the tracker connects
666     // for the first time, even if no update occurred.
667     if (tracker->update_needed) {
668         tracker->update_needed = false;
669         device_tracker_send(tracker, list_transports(tracker->long_output));
670     }
671 }
672 
create_device_tracker(bool long_output)673 asocket* create_device_tracker(bool long_output) {
674     device_tracker* tracker = new device_tracker();
675     if (tracker == nullptr) LOG(FATAL) << "cannot allocate device tracker";
676 
677     D("device tracker %p created", tracker);
678 
679     tracker->socket.enqueue = device_tracker_enqueue;
680     tracker->socket.ready = device_tracker_ready;
681     tracker->socket.close = device_tracker_close;
682     tracker->update_needed = true;
683     tracker->long_output = long_output;
684 
685     tracker->next = device_tracker_list;
686     device_tracker_list = tracker;
687 
688     return &tracker->socket;
689 }
690 
691 // Check if all of the USB transports are connected.
iterate_transports(std::function<bool (const atransport *)> fn)692 bool iterate_transports(std::function<bool(const atransport*)> fn) {
693     std::lock_guard<std::recursive_mutex> lock(transport_lock);
694     for (const auto& t : transport_list) {
695         if (!fn(t)) {
696             return false;
697         }
698     }
699     for (const auto& t : pending_list) {
700         if (!fn(t)) {
701             return false;
702         }
703     }
704     return true;
705 }
706 
707 // Call this function each time the transport list has changed.
update_transports()708 void update_transports() {
709     update_transport_status();
710 
711     // Notify `adb track-devices` clients.
712     device_tracker* tracker = device_tracker_list;
713     while (tracker != nullptr) {
714         device_tracker* next = tracker->next;
715         // This may destroy the tracker if the connection is closed.
716         device_tracker_send(tracker, list_transports(tracker->long_output));
717         tracker = next;
718     }
719 }
720 
721 #else
722 
update_transports()723 void update_transports() {
724     // Nothing to do on the device side.
725 }
726 
727 #endif  // ADB_HOST
728 
729 // The transport listeners communicate with the transports list via fdevent. tmsg structure is a
730 // container used as message unit and written to a pipe in order to communicate the transport
731 // (pointer) and the action to perform.
732 //
733 //     Transport listener         FDEVENT          Transports list
734 //     --------------------------------------------------------------
735 //     (&transport,action)   ->    tmsg     ->   (&transport, action)
736 //
737 // TODO: Figure out if this fdevent bridge really is necessary? With the re-entrant lock to sync
738 // access, what prevents us from "simply" updating the transport_list directly?
739 
740 struct tmsg {
741     atransport* transport;
742 
743     enum struct Action : int {
744         UNREGISTER = 0,  // Unregister the transport from transport list (typically a device has
745                          // been unplugged from USB or disconnected from TCP.
746         REGISTER = 1,  // Register the transport to the transport list (typically a device has been
747                        // plugged via USB or connected via TCP.
748     };
749     Action action;
750 };
751 
transport_read_action(int fd,struct tmsg * m)752 static int transport_read_action(int fd, struct tmsg* m) {
753     char* p = (char*)m;
754     int len = sizeof(*m);
755     int r;
756 
757     while (len > 0) {
758         r = adb_read(fd, p, len);
759         if (r > 0) {
760             len -= r;
761             p += r;
762         } else {
763             D("transport_read_action: on fd %d: %s", fd, strerror(errno));
764             return -1;
765         }
766     }
767     return 0;
768 }
769 
transport_write_action(int fd,struct tmsg * m)770 static int transport_write_action(int fd, struct tmsg* m) {
771     char* p = (char*)m;
772     int len = sizeof(*m);
773     int r;
774 
775     while (len > 0) {
776         r = adb_write(fd, p, len);
777         if (r > 0) {
778             len -= r;
779             p += r;
780         } else {
781             D("transport_write_action: on fd %d: %s", fd, strerror(errno));
782             return -1;
783         }
784     }
785     return 0;
786 }
787 
usb_devices_start_detached()788 static bool usb_devices_start_detached() {
789 #if ADB_HOST
790     static const char* env = getenv("ADB_LIBUSB_START_DETACHED");
791     static bool result = env && strcmp("1", env) == 0;
792     return should_use_libusb() && result;
793 #else
794     return false;
795 #endif
796 }
797 
transport_registration_func(int _fd,unsigned ev,void *)798 static void transport_registration_func(int _fd, unsigned ev, void*) {
799     tmsg m;
800     atransport* t;
801 
802     if (!(ev & FDE_READ)) {
803         return;
804     }
805 
806     if (transport_read_action(_fd, &m)) {
807         PLOG(FATAL) << "cannot read transport registration socket";
808     }
809 
810     t = m.transport;
811 
812     if (m.action == tmsg::Action::UNREGISTER) {
813         D("transport: %s deleting", t->serial.c_str());
814 
815         {
816             std::lock_guard<std::recursive_mutex> lock(transport_lock);
817             transport_list.remove(t);
818         }
819 
820         delete t;
821 
822         update_transports();
823         return;
824     }
825 
826     /* don't create transport threads for inaccessible devices */
827     if (t->GetConnectionState() != kCsNoPerm) {
828         t->connection()->SetTransport(t);
829 
830         if (t->type == kTransportUsb && usb_devices_start_detached()) {
831             t->SetConnectionState(kCsDetached);
832         } else {
833             t->connection()->Start();
834 #if ADB_HOST
835             send_connect(t);
836 #endif
837         }
838     }
839 
840     {
841         std::lock_guard<std::recursive_mutex> lock(transport_lock);
842         auto it = std::find(pending_list.begin(), pending_list.end(), t);
843         if (it != pending_list.end()) {
844             pending_list.remove(t);
845             transport_list.push_front(t);
846         }
847     }
848 
849     update_transports();
850 }
851 
852 #if ADB_HOST
init_reconnect_handler(void)853 void init_reconnect_handler(void) {
854     reconnect_handler.Start();
855 }
856 #endif
857 
init_transport_registration(void)858 void init_transport_registration(void) {
859     int s[2];
860 
861     if (adb_socketpair(s)) {
862         PLOG(FATAL) << "cannot open transport registration socketpair";
863     }
864     D("socketpair: (%d,%d)", s[0], s[1]);
865 
866     transport_registration_send = s[0];
867     transport_registration_recv = s[1];
868 
869     transport_registration_fde =
870             fdevent_create(transport_registration_recv, transport_registration_func, nullptr);
871     fdevent_set(transport_registration_fde, FDE_READ);
872 }
873 
kick_all_transports()874 void kick_all_transports() {
875 #if ADB_HOST
876     reconnect_handler.Stop();
877 #endif
878     // To avoid only writing part of a packet to a transport after exit, kick all transports.
879     std::lock_guard<std::recursive_mutex> lock(transport_lock);
880     for (auto t : transport_list) {
881         t->Kick();
882     }
883 }
884 
kick_all_tcp_tls_transports()885 void kick_all_tcp_tls_transports() {
886     std::lock_guard<std::recursive_mutex> lock(transport_lock);
887     for (auto t : transport_list) {
888         if (t->IsTcpDevice() && t->use_tls) {
889             t->Kick();
890         }
891     }
892 }
893 
894 #if !ADB_HOST
kick_all_transports_by_auth_key(std::string_view auth_key)895 void kick_all_transports_by_auth_key(std::string_view auth_key) {
896     std::lock_guard<std::recursive_mutex> lock(transport_lock);
897     for (auto t : transport_list) {
898         if (auth_key == t->auth_key) {
899             t->Kick();
900         }
901     }
902 }
903 #endif
904 
905 /* the fdevent select pump is single threaded */
register_transport(atransport * transport)906 void register_transport(atransport* transport) {
907     tmsg m;
908     m.transport = transport;
909     m.action = tmsg::Action::REGISTER;
910     D("transport: %s registered", transport->serial.c_str());
911     if (transport_write_action(transport_registration_send, &m)) {
912         PLOG(FATAL) << "cannot write transport registration socket";
913     }
914 }
915 
remove_transport(atransport * transport)916 static void remove_transport(atransport* transport) {
917     tmsg m;
918     m.transport = transport;
919     m.action = tmsg::Action::UNREGISTER;
920     D("transport: %s removed", transport->serial.c_str());
921     if (transport_write_action(transport_registration_send, &m)) {
922         PLOG(FATAL) << "cannot write transport registration socket";
923     }
924 }
925 
transport_destroy(atransport * t)926 static void transport_destroy(atransport* t) {
927     fdevent_check_looper();
928     CHECK(t != nullptr);
929 
930     std::lock_guard<std::recursive_mutex> lock(transport_lock);
931     LOG(INFO) << "destroying transport " << t->serial_name();
932     t->connection()->Stop();
933 #if ADB_HOST
934     if (t->IsTcpDevice() && !t->kicked()) {
935         D("transport: %s destroy (attempting reconnection)", t->serial.c_str());
936 
937         // We need to clear the transport's keys, so that on the next connection, it tries
938         // again from the beginning.
939         t->ResetKeys();
940         reconnect_handler.TrackTransport(t);
941         return;
942     }
943 #endif
944 
945     D("transport: %s destroy (kicking and closing)", t->serial.c_str());
946     remove_transport(t);
947 }
948 
949 #if ADB_HOST
qual_match(const std::string & to_test,const char * prefix,const std::string & qual,bool sanitize_qual)950 static int qual_match(const std::string& to_test, const char* prefix, const std::string& qual,
951                       bool sanitize_qual) {
952     if (to_test.empty()) /* Return true if both the qual and to_test are empty strings. */
953         return qual.empty();
954 
955     if (qual.empty()) return 0;
956 
957     const char* ptr = to_test.c_str();
958     if (prefix) {
959         while (*prefix) {
960             if (*prefix++ != *ptr++) return 0;
961         }
962     }
963 
964     for (char ch : qual) {
965         if (sanitize_qual && !isalnum(ch)) ch = '_';
966         if (ch != *ptr++) return 0;
967     }
968 
969     /* Everything matched so far.  Return true if *ptr is a NUL. */
970     return !*ptr;
971 }
972 
973 // Contains either a device serial string or a USB device address like "usb:2-6"
974 const char* __transport_server_one_device = nullptr;
975 
transport_set_one_device(const char * adb_one_device)976 void transport_set_one_device(const char* adb_one_device) {
977     __transport_server_one_device = adb_one_device;
978 }
979 
transport_get_one_device()980 const char* transport_get_one_device() {
981     return __transport_server_one_device;
982 }
983 
transport_server_owns_device(std::string_view serial)984 bool transport_server_owns_device(std::string_view serial) {
985     if (!__transport_server_one_device) {
986         // If the server doesn't own one device, server owns all devices.
987         return true;
988     }
989     return serial.compare(__transport_server_one_device) == 0;
990 }
991 
transport_server_owns_device(std::string_view dev_path,std::string_view serial)992 bool transport_server_owns_device(std::string_view dev_path, std::string_view serial) {
993     if (!__transport_server_one_device) {
994         // If the server doesn't own one device, server owns all devices.
995         return true;
996     }
997     return serial.compare(__transport_server_one_device) == 0 ||
998            dev_path.compare(__transport_server_one_device) == 0;
999 }
1000 
acquire_one_transport(TransportType type,const char * serial,TransportId transport_id,bool * is_ambiguous,std::string * error_out,bool accept_any_state)1001 atransport* acquire_one_transport(TransportType type, const char* serial, TransportId transport_id,
1002                                   bool* is_ambiguous, std::string* error_out,
1003                                   bool accept_any_state) {
1004     atransport* result = nullptr;
1005 
1006     if (transport_id != 0) {
1007         *error_out = android::base::StringPrintf("no device with transport id '%" PRIu64 "'",
1008                                                  transport_id);
1009     } else if (serial) {
1010         *error_out = android::base::StringPrintf("device '%s' not found", serial);
1011     } else if (type == kTransportLocal) {
1012         *error_out = "no emulators found";
1013     } else if (type == kTransportAny) {
1014         *error_out = "no devices/emulators found";
1015     } else {
1016         *error_out = "no devices found";
1017     }
1018 
1019     std::unique_lock<std::recursive_mutex> lock(transport_lock);
1020     for (const auto& t : transport_list) {
1021         if (t->GetConnectionState() == kCsNoPerm) {
1022             *error_out = UsbNoPermissionsLongHelpText();
1023             continue;
1024         }
1025 
1026         if (transport_id) {
1027             if (t->id == transport_id) {
1028                 result = t;
1029                 break;
1030             }
1031         } else if (serial) {
1032             if (t->MatchesTarget(serial)) {
1033                 if (result) {
1034                     *error_out = "more than one device with serial "s + serial;
1035                     if (is_ambiguous) *is_ambiguous = true;
1036                     result = nullptr;
1037                     break;
1038                 }
1039                 result = t;
1040             }
1041         } else {
1042             if (type == kTransportUsb && t->type == kTransportUsb) {
1043                 if (result) {
1044                     *error_out = "more than one USB device";
1045                     if (is_ambiguous) *is_ambiguous = true;
1046                     result = nullptr;
1047                     break;
1048                 }
1049                 result = t;
1050             } else if (type == kTransportLocal && t->type == kTransportLocal) {
1051                 if (result) {
1052                     *error_out = "more than one emulator";
1053                     if (is_ambiguous) *is_ambiguous = true;
1054                     result = nullptr;
1055                     break;
1056                 }
1057                 result = t;
1058             } else if (type == kTransportAny) {
1059                 if (result) {
1060                     *error_out = "more than one device/emulator";
1061                     if (is_ambiguous) *is_ambiguous = true;
1062                     result = nullptr;
1063                     break;
1064                 }
1065                 result = t;
1066             }
1067         }
1068     }
1069     lock.unlock();
1070 
1071     if (result && !accept_any_state) {
1072         // The caller requires an active transport.
1073         // Make sure that we're actually connected.
1074         ConnectionState state = result->GetConnectionState();
1075         switch (state) {
1076             case kCsConnecting:
1077                 *error_out = "device still connecting";
1078                 result = nullptr;
1079                 break;
1080 
1081             case kCsAuthorizing:
1082                 *error_out = "device still authorizing";
1083                 result = nullptr;
1084                 break;
1085 
1086             case kCsUnauthorized: {
1087                 *error_out = "device unauthorized.\n";
1088                 char* ADB_VENDOR_KEYS = getenv("ADB_VENDOR_KEYS");
1089                 *error_out += "This adb server's $ADB_VENDOR_KEYS is ";
1090                 *error_out += ADB_VENDOR_KEYS ? ADB_VENDOR_KEYS : "not set";
1091                 *error_out += "\n";
1092                 *error_out += "Try 'adb kill-server' if that seems wrong.\n";
1093                 *error_out += "Otherwise check for a confirmation dialog on your device.";
1094                 result = nullptr;
1095                 break;
1096             }
1097 
1098             case kCsOffline:
1099                 *error_out = "device offline";
1100                 result = nullptr;
1101                 break;
1102 
1103             default:
1104                 break;
1105         }
1106     }
1107 
1108     if (result) {
1109         *error_out = "success";
1110     }
1111 
1112     return result;
1113 }
1114 
WaitForConnection(std::chrono::milliseconds timeout)1115 bool ConnectionWaitable::WaitForConnection(std::chrono::milliseconds timeout) {
1116     std::unique_lock<std::mutex> lock(mutex_);
1117     ScopedLockAssertion assume_locked(mutex_);
1118     return cv_.wait_for(lock, timeout, [&]() REQUIRES(mutex_) {
1119         return connection_established_ready_;
1120     }) && connection_established_;
1121 }
1122 
SetConnectionEstablished(bool success)1123 void ConnectionWaitable::SetConnectionEstablished(bool success) {
1124     {
1125         std::lock_guard<std::mutex> lock(mutex_);
1126         if (connection_established_ready_) return;
1127         connection_established_ready_ = true;
1128         connection_established_ = success;
1129         D("connection established with %d", success);
1130     }
1131     cv_.notify_one();
1132 }
1133 #endif
1134 
~atransport()1135 atransport::~atransport() {
1136 #if ADB_HOST
1137     // If the connection callback had not been run before, run it now.
1138     SetConnectionEstablished(false);
1139 #endif
1140 }
1141 
Write(apacket * p)1142 int atransport::Write(apacket* p) {
1143     return this->connection()->Write(std::unique_ptr<apacket>(p)) ? 0 : -1;
1144 }
1145 
Reset()1146 void atransport::Reset() {
1147     if (!kicked_.exchange(true)) {
1148         LOG(INFO) << "resetting transport " << this << " " << this->serial;
1149         this->connection()->Reset();
1150     }
1151 }
1152 
Kick()1153 void atransport::Kick() {
1154     if (!kicked_.exchange(true)) {
1155         LOG(INFO) << "kicking transport " << this << " " << this->serial;
1156         this->connection()->Stop();
1157     }
1158 }
1159 
GetConnectionState() const1160 ConnectionState atransport::GetConnectionState() const {
1161     return connection_state_;
1162 }
1163 
SetConnectionState(ConnectionState state)1164 void atransport::SetConnectionState(ConnectionState state) {
1165     fdevent_check_looper();
1166     connection_state_ = state;
1167     update_transports();
1168 }
1169 
1170 #if ADB_HOST
Attach(std::string * error)1171 bool atransport::Attach(std::string* error) {
1172     D("%s: attach", serial.c_str());
1173     fdevent_check_looper();
1174 
1175     if (!should_use_libusb()) {
1176         *error = "attach/detach only implemented for libusb backend";
1177         return false;
1178     }
1179 
1180     if (GetConnectionState() != ConnectionState::kCsDetached) {
1181         *error = android::base::StringPrintf("transport %s is not detached", serial.c_str());
1182         return false;
1183     }
1184 
1185     ResetKeys();
1186 
1187     {
1188         std::lock_guard<std::mutex> lock(mutex_);
1189         if (!connection_->Attach(error)) {
1190             return false;
1191         }
1192     }
1193 
1194     send_connect(this);
1195     return true;
1196 }
1197 
Detach(std::string * error)1198 bool atransport::Detach(std::string* error) {
1199     D("%s: detach", serial.c_str());
1200     fdevent_check_looper();
1201 
1202     if (!should_use_libusb()) {
1203         *error = "attach/detach only implemented for libusb backend";
1204         return false;
1205     }
1206 
1207     if (GetConnectionState() == ConnectionState::kCsDetached) {
1208         *error = android::base::StringPrintf("transport %s is already detached", serial.c_str());
1209         return false;
1210     }
1211 
1212     handle_offline(this);
1213 
1214     {
1215         std::lock_guard<std::mutex> lock(mutex_);
1216         if (!connection_->Detach(error)) {
1217             return false;
1218         }
1219     }
1220 
1221     this->SetConnectionState(kCsDetached);
1222     return true;
1223 }
1224 #endif
1225 
SetConnection(std::shared_ptr<Connection> connection)1226 void atransport::SetConnection(std::shared_ptr<Connection> connection) {
1227     std::lock_guard<std::mutex> lock(mutex_);
1228     connection_ = std::shared_ptr<Connection>(std::move(connection));
1229 }
1230 
HandleRead(std::unique_ptr<apacket> p)1231 bool atransport::HandleRead(std::unique_ptr<apacket> p) {
1232     if (!check_header(p.get(), this)) {
1233         D("%s: remote read: bad header", serial.c_str());
1234         return false;
1235     }
1236 
1237     VLOG(TRANSPORT) << dump_packet(serial.c_str(), "from remote", p.get());
1238     apacket* packet = p.release();
1239 
1240     // This needs to run on the looper thread since the associated fdevent
1241     // message pump exists in that context.
1242     fdevent_run_on_looper([packet, this]() { handle_packet(packet, this); });
1243 
1244     return true;
1245 }
1246 
HandleError(const std::string & error)1247 void atransport::HandleError(const std::string& error) {
1248     LOG(INFO) << serial_name() << ": connection terminated: " << error;
1249     fdevent_run_on_looper([this]() {
1250         handle_offline(this);
1251         transport_destroy(this);
1252     });
1253 }
1254 
update_version(int version,size_t payload)1255 void atransport::update_version(int version, size_t payload) {
1256     protocol_version = std::min(version, A_VERSION);
1257     max_payload = std::min(payload, MAX_PAYLOAD);
1258 }
1259 
get_protocol_version() const1260 int atransport::get_protocol_version() const {
1261     return protocol_version;
1262 }
1263 
get_tls_version() const1264 int atransport::get_tls_version() const {
1265     return tls_version;
1266 }
1267 
get_max_payload() const1268 size_t atransport::get_max_payload() const {
1269     return max_payload;
1270 }
1271 
1272 #if ADB_HOST
delayed_ack_enabled()1273 static bool delayed_ack_enabled() {
1274     static const char* env = getenv("ADB_DELAYED_ACK");
1275     static bool result = env && strcmp(env, "1") == 0;
1276     return result;
1277 }
1278 #endif
1279 
supported_features()1280 const FeatureSet& supported_features() {
1281     static const android::base::NoDestructor<FeatureSet> features([]() {
1282         // Increment ADB_SERVER_VERSION when adding a feature that adbd needs
1283         // to know about. Otherwise, the client can be stuck running an old
1284         // version of the server even after upgrading their copy of adb.
1285         // (http://b/24370690)
1286 
1287         // clang-format off
1288         FeatureSet result {
1289             kFeatureShell2,
1290             kFeatureCmd,
1291             kFeatureStat2,
1292             kFeatureLs2,
1293             kFeatureFixedPushMkdir,
1294             kFeatureApex,
1295             kFeatureAbb,
1296             kFeatureFixedPushSymlinkTimestamp,
1297             kFeatureAbbExec,
1298             kFeatureRemountShell,
1299             kFeatureTrackApp,
1300             kFeatureSendRecv2,
1301             kFeatureSendRecv2Brotli,
1302             kFeatureSendRecv2LZ4,
1303             kFeatureSendRecv2Zstd,
1304             kFeatureSendRecv2DryRunSend,
1305             kFeatureOpenscreenMdns,
1306         };
1307         // clang-format on
1308 
1309 #if ADB_HOST
1310         if (delayed_ack_enabled()) {
1311             result.push_back(kFeatureDelayedAck);
1312         }
1313 #else
1314         result.push_back(kFeatureDelayedAck);
1315 #endif
1316         return result;
1317     }());
1318 
1319     return *features;
1320 }
1321 
FeatureSetToString(const FeatureSet & features)1322 std::string FeatureSetToString(const FeatureSet& features) {
1323     return android::base::Join(features, ',');
1324 }
1325 
StringToFeatureSet(const std::string & features_string)1326 FeatureSet StringToFeatureSet(const std::string& features_string) {
1327     if (features_string.empty()) {
1328         return FeatureSet();
1329     }
1330 
1331     return android::base::Split(features_string, ",");
1332 }
1333 
1334 template <class Range, class Value>
contains(const Range & r,const Value & v)1335 static bool contains(const Range& r, const Value& v) {
1336     return std::find(std::begin(r), std::end(r), v) != std::end(r);
1337 }
1338 
CanUseFeature(const FeatureSet & feature_set,const std::string & feature)1339 bool CanUseFeature(const FeatureSet& feature_set, const std::string& feature) {
1340     return contains(feature_set, feature) && contains(supported_features(), feature);
1341 }
1342 
has_feature(const std::string & feature) const1343 bool atransport::has_feature(const std::string& feature) const {
1344     return contains(features_, feature);
1345 }
1346 
SetFeatures(const std::string & features_string)1347 void atransport::SetFeatures(const std::string& features_string) {
1348     features_ = StringToFeatureSet(features_string);
1349     delayed_ack_ = CanUseFeature(features_, kFeatureDelayedAck);
1350 }
1351 
AddDisconnect(adisconnect * disconnect)1352 void atransport::AddDisconnect(adisconnect* disconnect) {
1353     disconnects_.push_back(disconnect);
1354 }
1355 
RemoveDisconnect(adisconnect * disconnect)1356 void atransport::RemoveDisconnect(adisconnect* disconnect) {
1357     disconnects_.remove(disconnect);
1358 }
1359 
RunDisconnects()1360 void atransport::RunDisconnects() {
1361     for (const auto& disconnect : disconnects_) {
1362         disconnect->func(disconnect->opaque, this);
1363     }
1364     disconnects_.clear();
1365 }
1366 
1367 #if ADB_HOST
MatchesTarget(const std::string & target) const1368 bool atransport::MatchesTarget(const std::string& target) const {
1369     if (!serial.empty()) {
1370         if (target == serial) {
1371             return true;
1372         } else if (type == kTransportLocal) {
1373             // Local transports can match [tcp:|udp:]<hostname>[:port].
1374             const char* local_target_ptr = target.c_str();
1375 
1376             // For fastboot compatibility, ignore protocol prefixes.
1377             if (android::base::StartsWith(target, "tcp:") ||
1378                 android::base::StartsWith(target, "udp:")) {
1379                 local_target_ptr += 4;
1380             }
1381 
1382             // Parse our |serial| and the given |target| to check if the hostnames and ports match.
1383             std::string serial_host, error;
1384             int serial_port = -1;
1385             if (android::base::ParseNetAddress(serial, &serial_host, &serial_port, nullptr,
1386                                                &error)) {
1387                 // |target| may omit the port to default to ours.
1388                 std::string target_host;
1389                 int target_port = serial_port;
1390                 if (android::base::ParseNetAddress(local_target_ptr, &target_host, &target_port,
1391                                                    nullptr, &error) &&
1392                     serial_host == target_host && serial_port == target_port) {
1393                     return true;
1394                 }
1395             }
1396         }
1397     }
1398 
1399     return (target == devpath) || qual_match(target, "product:", product, false) ||
1400            qual_match(target, "model:", model, true) ||
1401            qual_match(target, "device:", device, false);
1402 }
1403 
SetConnectionEstablished(bool success)1404 void atransport::SetConnectionEstablished(bool success) {
1405     connection_waitable_->SetConnectionEstablished(success);
1406 }
1407 
Reconnect()1408 ReconnectResult atransport::Reconnect() {
1409     return reconnect_(this);
1410 }
1411 
1412 // We use newline as our delimiter, make sure to never output it.
sanitize(std::string str,bool alphanumeric)1413 static std::string sanitize(std::string str, bool alphanumeric) {
1414     auto pred = alphanumeric ? [](const char c) { return !isalnum(c); }
1415                              : [](const char c) { return c == '\n'; };
1416     std::replace_if(str.begin(), str.end(), pred, '_');
1417     return str;
1418 }
1419 
append_transport_info(std::string * result,const char * key,const std::string & value,bool alphanumeric)1420 static void append_transport_info(std::string* result, const char* key, const std::string& value,
1421                                   bool alphanumeric) {
1422     if (value.empty()) {
1423         return;
1424     }
1425 
1426     *result += ' ';
1427     *result += key;
1428     *result += sanitize(value, alphanumeric);
1429 }
1430 
append_transport(const atransport * t,std::string * result,bool long_listing)1431 static void append_transport(const atransport* t, std::string* result, bool long_listing) {
1432     std::string serial = t->serial;
1433     if (serial.empty()) {
1434         serial = "(no serial number)";
1435     }
1436 
1437     if (!long_listing) {
1438         *result += serial;
1439         *result += '\t';
1440         *result += to_string(t->GetConnectionState());
1441     } else {
1442         android::base::StringAppendF(result, "%-22s %s", serial.c_str(),
1443                                      to_string(t->GetConnectionState()).c_str());
1444 
1445         append_transport_info(result, "", t->devpath, false);
1446         append_transport_info(result, "product:", t->product, false);
1447         append_transport_info(result, "model:", t->model, true);
1448         append_transport_info(result, "device:", t->device, false);
1449 
1450         // Put id at the end, so that anyone parsing the output here can always find it by scanning
1451         // backwards from newlines, even with hypothetical devices named 'transport_id:1'.
1452         *result += " transport_id:";
1453         *result += std::to_string(t->id);
1454     }
1455     *result += '\n';
1456 }
1457 
list_transports(bool long_listing)1458 std::string list_transports(bool long_listing) {
1459     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1460 
1461     auto sorted_transport_list = transport_list;
1462     sorted_transport_list.sort([](atransport*& x, atransport*& y) {
1463         if (x->type != y->type) {
1464             return x->type < y->type;
1465         }
1466         return x->serial < y->serial;
1467     });
1468 
1469     std::string result;
1470     for (const auto& t : sorted_transport_list) {
1471         append_transport(t, &result, long_listing);
1472     }
1473     return result;
1474 }
1475 
close_usb_devices(std::function<bool (const atransport *)> predicate,bool reset)1476 void close_usb_devices(std::function<bool(const atransport*)> predicate, bool reset) {
1477     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1478     for (auto& t : transport_list) {
1479         if (predicate(t)) {
1480             if (reset) {
1481                 t->Reset();
1482             } else {
1483                 t->Kick();
1484             }
1485         }
1486     }
1487 }
1488 
1489 /* hack for osx */
close_usb_devices(bool reset)1490 void close_usb_devices(bool reset) {
1491     close_usb_devices([](const atransport*) { return true; }, reset);
1492 }
1493 #endif
1494 
validate_transport_list(const std::list<atransport * > & list,const std::string & serial,atransport * t,int * error)1495 bool validate_transport_list(const std::list<atransport*>& list, const std::string& serial,
1496                              atransport* t, int* error) {
1497     for (const auto& transport : list) {
1498         if (serial == transport->serial) {
1499             const std::string list_name(&list == &pending_list ? "pending" : "transport");
1500             VLOG(TRANSPORT) << "socket transport " << transport->serial << " is already in the "
1501                             << list_name << " list and fails to register";
1502             delete t;
1503             if (error) *error = EALREADY;
1504             return false;
1505         }
1506     }
1507     return true;
1508 }
1509 
register_socket_transport(unique_fd s,std::string serial,int port,int local,atransport::ReconnectCallback reconnect,bool use_tls,int * error)1510 bool register_socket_transport(unique_fd s, std::string serial, int port, int local,
1511                                atransport::ReconnectCallback reconnect, bool use_tls, int* error) {
1512     atransport* t = new atransport(std::move(reconnect), kCsOffline);
1513     t->use_tls = use_tls;
1514     t->serial = std::move(serial);
1515 
1516     D("transport: %s init'ing for socket %d, on port %d", t->serial.c_str(), s.get(), port);
1517     if (init_socket_transport(t, std::move(s), port, local) < 0) {
1518         delete t;
1519         if (error) *error = errno;
1520         return false;
1521     }
1522 
1523     std::unique_lock<std::recursive_mutex> lock(transport_lock);
1524     if (!validate_transport_list(pending_list, t->serial, t, error)) {
1525         return false;
1526     }
1527 
1528     if (!validate_transport_list(transport_list, t->serial, t, error)) {
1529         return false;
1530     }
1531 
1532     pending_list.push_front(t);
1533 
1534     lock.unlock();
1535 
1536 #if ADB_HOST
1537     auto waitable = t->connection_waitable();
1538 #endif
1539     register_transport(t);
1540 
1541     if (local == 1) {
1542         // Do not wait for emulator transports.
1543         return true;
1544     }
1545 
1546 #if ADB_HOST
1547     if (!waitable->WaitForConnection(std::chrono::seconds(10))) {
1548         if (error) *error = ETIMEDOUT;
1549         return false;
1550     }
1551 
1552     if (t->GetConnectionState() == kCsUnauthorized) {
1553         if (error) *error = EPERM;
1554         return false;
1555     }
1556 #endif
1557 
1558     return true;
1559 }
1560 
1561 #if ADB_HOST
find_transport(const char * serial)1562 atransport* find_transport(const char* serial) {
1563     atransport* result = nullptr;
1564 
1565     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1566     for (auto& t : transport_list) {
1567         if (strcmp(serial, t->serial.c_str()) == 0) {
1568             result = t;
1569             break;
1570         }
1571     }
1572 
1573     return result;
1574 }
1575 
kick_all_tcp_devices()1576 void kick_all_tcp_devices() {
1577     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1578     for (auto& t : transport_list) {
1579         if (t->IsTcpDevice()) {
1580             // Kicking breaks the read_transport thread of this transport out of any read, then
1581             // the read_transport thread will notify the main thread to make this transport
1582             // offline. Then the main thread will notify the write_transport thread to exit.
1583             // Finally, this transport will be closed and freed in the main thread.
1584             t->Kick();
1585         }
1586     }
1587     reconnect_handler.CheckForKicked();
1588 }
1589 
register_usb_transport(std::shared_ptr<Connection> connection,const char * serial,const char * devpath,unsigned writeable)1590 void register_usb_transport(std::shared_ptr<Connection> connection, const char* serial,
1591                             const char* devpath, unsigned writeable) {
1592     atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm);
1593     if (serial) {
1594         t->serial = serial;
1595     }
1596     if (devpath) {
1597         t->devpath = devpath;
1598     }
1599 
1600     t->SetConnection(std::move(connection));
1601     t->type = kTransportUsb;
1602 
1603     {
1604         std::lock_guard<std::recursive_mutex> lock(transport_lock);
1605         pending_list.push_front(t);
1606     }
1607 
1608     register_transport(t);
1609 }
1610 
register_usb_transport(usb_handle * usb,const char * serial,const char * devpath,unsigned writeable)1611 void register_usb_transport(usb_handle* usb, const char* serial, const char* devpath,
1612                             unsigned writeable) {
1613     atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm);
1614 
1615     D("transport: %p init'ing for usb_handle %p (sn='%s')", t, usb, serial ? serial : "");
1616     init_usb_transport(t, usb);
1617     if (serial) {
1618         t->serial = serial;
1619     }
1620 
1621     if (devpath) {
1622         t->devpath = devpath;
1623     }
1624 
1625     {
1626         std::lock_guard<std::recursive_mutex> lock(transport_lock);
1627         pending_list.push_front(t);
1628     }
1629 
1630     register_transport(t);
1631 }
1632 
1633 // This should only be used for transports with connection_state == kCsNoPerm.
unregister_usb_transport(usb_handle * usb)1634 void unregister_usb_transport(usb_handle* usb) {
1635     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1636     transport_list.remove_if([usb](atransport* t) {
1637         return t->GetUsbHandle() == usb && t->GetConnectionState() == kCsNoPerm;
1638     });
1639 }
1640 
1641 // Track reverse:forward commands, so that info can be used to develop
1642 // an 'allow-list':
1643 //   - adb reverse tcp:<device_port> localhost:<host_port> : responds with the
1644 //   device_port
1645 //   - adb reverse --remove tcp:<device_port> : responds OKAY
1646 //   - adb reverse --remove-all : responds OKAY
UpdateReverseConfig(std::string_view service_addr)1647 void atransport::UpdateReverseConfig(std::string_view service_addr) {
1648     fdevent_check_looper();
1649     if (!android::base::ConsumePrefix(&service_addr, "reverse:")) {
1650         return;
1651     }
1652 
1653     if (android::base::ConsumePrefix(&service_addr, "forward:")) {
1654         // forward:[norebind:]<remote>;<local>
1655         bool norebind = android::base::ConsumePrefix(&service_addr, "norebind:");
1656         auto it = service_addr.find(';');
1657         if (it == std::string::npos) {
1658             return;
1659         }
1660         std::string remote(service_addr.substr(0, it));
1661 
1662         if (norebind && reverse_forwards_.find(remote) != reverse_forwards_.end()) {
1663             // This will fail, don't update the map.
1664             LOG(DEBUG) << "ignoring reverse forward that will fail due to norebind";
1665             return;
1666         }
1667 
1668         std::string local(service_addr.substr(it + 1));
1669         reverse_forwards_[remote] = local;
1670     } else if (android::base::ConsumePrefix(&service_addr, "killforward:")) {
1671         // kill-forward:<remote>
1672         auto it = service_addr.find(';');
1673         if (it != std::string::npos) {
1674             return;
1675         }
1676         reverse_forwards_.erase(std::string(service_addr));
1677     } else if (service_addr == "killforward-all") {
1678         reverse_forwards_.clear();
1679     } else if (service_addr == "list-forward") {
1680         LOG(DEBUG) << __func__ << " ignoring --list";
1681     } else {  // Anything else we need to know about?
1682         LOG(FATAL) << "unhandled reverse service: " << service_addr;
1683     }
1684 }
1685 
1686 // Is this an authorized :connect request?
IsReverseConfigured(const std::string & local_addr)1687 bool atransport::IsReverseConfigured(const std::string& local_addr) {
1688     fdevent_check_looper();
1689     for (const auto& [remote, local] : reverse_forwards_) {
1690         if (local == local_addr) {
1691             return true;
1692         }
1693     }
1694     return false;
1695 }
1696 
1697 #endif
1698 
check_header(apacket * p,atransport * t)1699 bool check_header(apacket* p, atransport* t) {
1700     if (p->msg.magic != (p->msg.command ^ 0xffffffff)) {
1701         VLOG(RWX) << "check_header(): invalid magic command = " << std::hex << p->msg.command
1702                   << ", magic = " << p->msg.magic;
1703         return false;
1704     }
1705 
1706     if (p->msg.data_length > t->get_max_payload()) {
1707         VLOG(RWX) << "check_header(): " << p->msg.data_length
1708                   << " atransport::max_payload = " << t->get_max_payload();
1709         return false;
1710     }
1711 
1712     return true;
1713 }
1714 
1715 #if ADB_HOST
Key()1716 std::shared_ptr<RSA> atransport::Key() {
1717     if (keys_.empty()) {
1718         return nullptr;
1719     }
1720 
1721     std::shared_ptr<RSA> result = keys_[0];
1722     return result;
1723 }
1724 
NextKey()1725 std::shared_ptr<RSA> atransport::NextKey() {
1726     if (keys_.empty()) {
1727         LOG(INFO) << "fetching keys for transport " << this->serial_name();
1728         keys_ = adb_auth_get_private_keys();
1729 
1730         // We should have gotten at least one key: the one that's automatically generated.
1731         CHECK(!keys_.empty());
1732     } else {
1733         keys_.pop_front();
1734     }
1735 
1736     return Key();
1737 }
1738 
ResetKeys()1739 void atransport::ResetKeys() {
1740     keys_.clear();
1741 }
1742 #endif
1743