1 /*
2 * Copyright (C) 2007 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define TRACE_TAG TRANSPORT
18
19 #include "sysdeps.h"
20
21 #include "transport.h"
22
23 #include <ctype.h>
24 #include <errno.h>
25 #include <inttypes.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <unistd.h>
30
31 #include <algorithm>
32 #include <list>
33 #include <memory>
34 #include <mutex>
35 #include <set>
36 #include <string>
37 #include <thread>
38
39 #include <adb/crypto/rsa_2048_key.h>
40 #include <adb/crypto/x509_generator.h>
41 #include <adb/tls/tls_connection.h>
42 #include <android-base/logging.h>
43 #include <android-base/no_destructor.h>
44 #include <android-base/parsenetaddress.h>
45 #include <android-base/stringprintf.h>
46 #include <android-base/strings.h>
47 #include <android-base/thread_annotations.h>
48 #include <diagnose_usb.h>
49
50 #include "adb.h"
51 #include "adb_auth.h"
52 #include "adb_io.h"
53 #include "adb_trace.h"
54 #include "adb_utils.h"
55 #include "fdevent/fdevent.h"
56 #include "sysdeps/chrono.h"
57
58 #if ADB_HOST
59 #include "client/usb.h"
60 #endif
61
62 using namespace adb::crypto;
63 using namespace adb::tls;
64 using namespace std::string_literals;
65 using android::base::ScopedLockAssertion;
66 using TlsError = TlsConnection::TlsError;
67
68 static void remove_transport(atransport* transport);
69 static void transport_destroy(atransport* transport);
70
71 // TODO: unordered_map<TransportId, atransport*>
72 static auto& transport_list = *new std::list<atransport*>();
73 static auto& pending_list = *new std::list<atransport*>();
74
75 static auto& transport_lock = *new std::recursive_mutex();
76
77 const char* const kFeatureShell2 = "shell_v2";
78 const char* const kFeatureCmd = "cmd";
79 const char* const kFeatureStat2 = "stat_v2";
80 const char* const kFeatureLs2 = "ls_v2";
81 const char* const kFeatureLibusb = "libusb";
82 const char* const kFeaturePushSync = "push_sync";
83 const char* const kFeatureApex = "apex";
84 const char* const kFeatureFixedPushMkdir = "fixed_push_mkdir";
85 const char* const kFeatureAbb = "abb";
86 const char* const kFeatureFixedPushSymlinkTimestamp = "fixed_push_symlink_timestamp";
87 const char* const kFeatureAbbExec = "abb_exec";
88 const char* const kFeatureRemountShell = "remount_shell";
89 const char* const kFeatureTrackApp = "track_app";
90 const char* const kFeatureSendRecv2 = "sendrecv_v2";
91 const char* const kFeatureSendRecv2Brotli = "sendrecv_v2_brotli";
92 const char* const kFeatureSendRecv2LZ4 = "sendrecv_v2_lz4";
93 const char* const kFeatureSendRecv2Zstd = "sendrecv_v2_zstd";
94 const char* const kFeatureSendRecv2DryRunSend = "sendrecv_v2_dry_run_send";
95 const char* const kFeatureDelayedAck = "delayed_ack";
96 // TODO(joshuaduong): Bump to v2 when openscreen discovery is enabled by default
97 const char* const kFeatureOpenscreenMdns = "openscreen_mdns";
98
99 namespace {
100
101 #if ADB_HOST
102
103 // Tracks and handles atransport*s that are attempting reconnection.
104 class ReconnectHandler {
105 public:
106 ReconnectHandler() = default;
107 ~ReconnectHandler() = default;
108
109 // Starts the ReconnectHandler thread.
110 void Start();
111
112 // Requests the ReconnectHandler thread to stop.
113 void Stop();
114
115 // Adds the atransport* to the queue of reconnect attempts.
116 void TrackTransport(atransport* transport);
117
118 // Wake up the ReconnectHandler thread to have it check for kicked transports.
119 void CheckForKicked();
120
121 private:
122 // The main thread loop.
123 void Run();
124
125 // Tracks a reconnection attempt.
126 struct ReconnectAttempt {
127 atransport* transport;
128 std::chrono::steady_clock::time_point reconnect_time;
129 size_t attempts_left;
130
operator <__anonc80066ee0111::ReconnectHandler::ReconnectAttempt131 bool operator<(const ReconnectAttempt& rhs) const {
132 if (reconnect_time == rhs.reconnect_time) {
133 return reinterpret_cast<uintptr_t>(transport) <
134 reinterpret_cast<uintptr_t>(rhs.transport);
135 }
136 return reconnect_time < rhs.reconnect_time;
137 }
138 };
139
140 // Only retry for up to one minute.
141 static constexpr const std::chrono::seconds kDefaultTimeout = 3s;
142 static constexpr const size_t kMaxAttempts = 20;
143
144 // Protects all members.
145 std::mutex reconnect_mutex_;
146 bool running_ GUARDED_BY(reconnect_mutex_) = true;
147 std::thread handler_thread_;
148 std::condition_variable reconnect_cv_;
149 std::set<ReconnectAttempt> reconnect_queue_ GUARDED_BY(reconnect_mutex_);
150
151 DISALLOW_COPY_AND_ASSIGN(ReconnectHandler);
152 };
153
Start()154 void ReconnectHandler::Start() {
155 fdevent_check_looper();
156 handler_thread_ = std::thread(&ReconnectHandler::Run, this);
157 }
158
Stop()159 void ReconnectHandler::Stop() {
160 fdevent_check_looper();
161 {
162 std::lock_guard<std::mutex> lock(reconnect_mutex_);
163 running_ = false;
164 }
165 reconnect_cv_.notify_one();
166 handler_thread_.join();
167
168 // Drain the queue to free all resources.
169 std::lock_guard<std::mutex> lock(reconnect_mutex_);
170 while (!reconnect_queue_.empty()) {
171 ReconnectAttempt attempt = *reconnect_queue_.begin();
172 reconnect_queue_.erase(reconnect_queue_.begin());
173 remove_transport(attempt.transport);
174 }
175 }
176
TrackTransport(atransport * transport)177 void ReconnectHandler::TrackTransport(atransport* transport) {
178 fdevent_check_looper();
179 {
180 std::lock_guard<std::mutex> lock(reconnect_mutex_);
181 if (!running_) return;
182 // Arbitrary sleep to give adbd time to get ready, if we disconnected because it exited.
183 auto reconnect_time = std::chrono::steady_clock::now() + 250ms;
184 reconnect_queue_.emplace(
185 ReconnectAttempt{transport, reconnect_time, ReconnectHandler::kMaxAttempts});
186 }
187 reconnect_cv_.notify_one();
188 }
189
CheckForKicked()190 void ReconnectHandler::CheckForKicked() {
191 reconnect_cv_.notify_one();
192 }
193
Run()194 void ReconnectHandler::Run() {
195 while (true) {
196 ReconnectAttempt attempt;
197 {
198 std::unique_lock<std::mutex> lock(reconnect_mutex_);
199 ScopedLockAssertion assume_lock(reconnect_mutex_);
200
201 if (!reconnect_queue_.empty()) {
202 // FIXME: libstdc++ (used on Windows) implements condition_variable with
203 // system_clock as its clock, so we're probably hosed if the clock changes,
204 // even if we use steady_clock throughout. This problem goes away once we
205 // switch to libc++.
206 reconnect_cv_.wait_until(lock, reconnect_queue_.begin()->reconnect_time);
207 } else {
208 reconnect_cv_.wait(lock);
209 }
210
211 if (!running_) return;
212
213 // Scan the whole list for kicked transports, so that we immediately handle an explicit
214 // disconnect request.
215 for (auto it = reconnect_queue_.begin(); it != reconnect_queue_.end();) {
216 if (it->transport->kicked()) {
217 D("transport %s was kicked. giving up on it.", it->transport->serial.c_str());
218 remove_transport(it->transport);
219 it = reconnect_queue_.erase(it);
220 } else {
221 ++it;
222 }
223 }
224
225 if (reconnect_queue_.empty()) continue;
226
227 // Go back to sleep if we either woke up spuriously, or we were woken up to remove
228 // a kicked transport, and the first transport isn't ready for reconnection yet.
229 auto now = std::chrono::steady_clock::now();
230 if (reconnect_queue_.begin()->reconnect_time > now) {
231 continue;
232 }
233
234 attempt = *reconnect_queue_.begin();
235 reconnect_queue_.erase(reconnect_queue_.begin());
236 }
237 D("attempting to reconnect %s", attempt.transport->serial.c_str());
238
239 switch (attempt.transport->Reconnect()) {
240 case ReconnectResult::Retry: {
241 D("attempting to reconnect %s failed.", attempt.transport->serial.c_str());
242 if (attempt.attempts_left == 0) {
243 D("transport %s exceeded the number of retry attempts. giving up on it.",
244 attempt.transport->serial.c_str());
245 remove_transport(attempt.transport);
246 continue;
247 }
248
249 std::lock_guard<std::mutex> lock(reconnect_mutex_);
250 reconnect_queue_.emplace(ReconnectAttempt{
251 attempt.transport,
252 std::chrono::steady_clock::now() + ReconnectHandler::kDefaultTimeout,
253 attempt.attempts_left - 1});
254 continue;
255 }
256
257 case ReconnectResult::Success:
258 D("reconnection to %s succeeded.", attempt.transport->serial.c_str());
259 register_transport(attempt.transport);
260 continue;
261
262 case ReconnectResult::Abort:
263 D("cancelling reconnection attempt to %s.", attempt.transport->serial.c_str());
264 remove_transport(attempt.transport);
265 continue;
266 }
267 }
268 }
269
270 static auto& reconnect_handler = *new ReconnectHandler();
271
272 #endif
273
274 } // namespace
275
NextTransportId()276 TransportId NextTransportId() {
277 static std::atomic<TransportId> next(1);
278 return next++;
279 }
280
Reset()281 void Connection::Reset() {
282 LOG(INFO) << "Connection::Reset(): stopping";
283 Stop();
284 }
285
Serial() const286 std::string Connection::Serial() const {
287 return transport_ ? transport_->serial_name() : "<unknown>";
288 }
289
BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)290 BlockingConnectionAdapter::BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)
291 : underlying_(std::move(connection)) {}
292
~BlockingConnectionAdapter()293 BlockingConnectionAdapter::~BlockingConnectionAdapter() {
294 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): destructing";
295 Stop();
296 }
297
Start()298 void BlockingConnectionAdapter::Start() {
299 std::lock_guard<std::mutex> lock(mutex_);
300 if (started_) {
301 LOG(FATAL) << "BlockingConnectionAdapter(" << Serial() << "): started multiple times";
302 }
303
304 StartReadThread();
305
306 write_thread_ = std::thread([this]() {
307 LOG(INFO) << Serial() << ": write thread spawning";
308 while (true) {
309 std::unique_lock<std::mutex> lock(mutex_);
310 ScopedLockAssertion assume_locked(mutex_);
311 cv_.wait(lock, [this]() REQUIRES(mutex_) {
312 return this->stopped_ || !this->write_queue_.empty();
313 });
314
315 if (this->stopped_) {
316 return;
317 }
318
319 std::unique_ptr<apacket> packet = std::move(this->write_queue_.front());
320 this->write_queue_.pop_front();
321 lock.unlock();
322
323 if (!this->underlying_->Write(packet.get())) {
324 break;
325 }
326 }
327 std::call_once(this->error_flag_, [this]() { transport_->HandleError("write failed"); });
328 });
329
330 started_ = true;
331 }
332
StartReadThread()333 void BlockingConnectionAdapter::StartReadThread() {
334 read_thread_ = std::thread([this]() {
335 LOG(INFO) << Serial() << ": read thread spawning";
336 while (true) {
337 auto packet = std::make_unique<apacket>();
338 if (!underlying_->Read(packet.get())) {
339 PLOG(INFO) << Serial() << ": read failed";
340 break;
341 }
342
343 bool got_stls_cmd = false;
344 if (packet->msg.command == A_STLS) {
345 got_stls_cmd = true;
346 }
347
348 transport_->HandleRead(std::move(packet));
349
350 // If we received the STLS packet, we are about to perform the TLS
351 // handshake. So this read thread must stop and resume after the
352 // handshake completes otherwise this will interfere in the process.
353 if (got_stls_cmd) {
354 LOG(INFO) << Serial() << ": Received STLS packet. Stopping read thread.";
355 return;
356 }
357 }
358 std::call_once(this->error_flag_, [this]() { transport_->HandleError("read failed"); });
359 });
360 }
361
DoTlsHandshake(RSA * key,std::string * auth_key)362 bool BlockingConnectionAdapter::DoTlsHandshake(RSA* key, std::string* auth_key) {
363 std::lock_guard<std::mutex> lock(mutex_);
364 if (read_thread_.joinable()) {
365 read_thread_.join();
366 }
367 bool success = this->underlying_->DoTlsHandshake(key, auth_key);
368 StartReadThread();
369 return success;
370 }
371
Reset()372 void BlockingConnectionAdapter::Reset() {
373 {
374 std::lock_guard<std::mutex> lock(mutex_);
375 if (!started_) {
376 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): not started";
377 return;
378 }
379
380 if (stopped_) {
381 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): already stopped";
382 return;
383 }
384 }
385
386 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): resetting";
387 this->underlying_->Reset();
388 Stop();
389 }
390
Stop()391 void BlockingConnectionAdapter::Stop() {
392 {
393 std::lock_guard<std::mutex> lock(mutex_);
394 if (!started_) {
395 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): not started";
396 return;
397 }
398
399 if (stopped_) {
400 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): already stopped";
401 return;
402 }
403
404 stopped_ = true;
405 }
406
407 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): stopping";
408
409 this->underlying_->Close();
410 this->cv_.notify_one();
411
412 // Move the threads out into locals with the lock taken, and then unlock to let them exit.
413 std::thread read_thread;
414 std::thread write_thread;
415
416 {
417 std::lock_guard<std::mutex> lock(mutex_);
418 read_thread = std::move(read_thread_);
419 write_thread = std::move(write_thread_);
420 }
421
422 read_thread.join();
423 write_thread.join();
424
425 LOG(INFO) << "BlockingConnectionAdapter(" << Serial() << "): stopped";
426 std::call_once(this->error_flag_, [this]() { transport_->HandleError("requested stop"); });
427 }
428
Write(std::unique_ptr<apacket> packet)429 bool BlockingConnectionAdapter::Write(std::unique_ptr<apacket> packet) {
430 {
431 std::lock_guard<std::mutex> lock(this->mutex_);
432 write_queue_.emplace_back(std::move(packet));
433 }
434
435 cv_.notify_one();
436 return true;
437 }
438
FdConnection(unique_fd fd)439 FdConnection::FdConnection(unique_fd fd) : fd_(std::move(fd)) {}
440
~FdConnection()441 FdConnection::~FdConnection() {}
442
DispatchRead(void * buf,size_t len)443 bool FdConnection::DispatchRead(void* buf, size_t len) {
444 if (tls_ != nullptr) {
445 // The TlsConnection doesn't allow 0 byte reads
446 if (len == 0) {
447 return true;
448 }
449 return tls_->ReadFully(buf, len);
450 }
451
452 return ReadFdExactly(fd_.get(), buf, len);
453 }
454
DispatchWrite(void * buf,size_t len)455 bool FdConnection::DispatchWrite(void* buf, size_t len) {
456 if (tls_ != nullptr) {
457 // The TlsConnection doesn't allow 0 byte writes
458 if (len == 0) {
459 return true;
460 }
461 return tls_->WriteFully(std::string_view(reinterpret_cast<const char*>(buf), len));
462 }
463
464 return WriteFdExactly(fd_.get(), buf, len);
465 }
466
Read(apacket * packet)467 bool FdConnection::Read(apacket* packet) {
468 if (!DispatchRead(&packet->msg, sizeof(amessage))) {
469 D("remote local: read terminated (message)");
470 return false;
471 }
472
473 if (packet->msg.data_length > MAX_PAYLOAD) {
474 D("remote local: read overflow (data length = %" PRIu32 ")", packet->msg.data_length);
475 return false;
476 }
477
478 packet->payload.resize(packet->msg.data_length);
479
480 if (!DispatchRead(&packet->payload[0], packet->payload.size())) {
481 D("remote local: terminated (data)");
482 return false;
483 }
484
485 return true;
486 }
487
Write(apacket * packet)488 bool FdConnection::Write(apacket* packet) {
489 if (!DispatchWrite(&packet->msg, sizeof(packet->msg))) {
490 D("remote local: write terminated");
491 return false;
492 }
493
494 if (packet->msg.data_length) {
495 if (!DispatchWrite(&packet->payload[0], packet->msg.data_length)) {
496 D("remote local: write terminated");
497 return false;
498 }
499 }
500
501 return true;
502 }
503
DoTlsHandshake(RSA * key,std::string * auth_key)504 bool FdConnection::DoTlsHandshake(RSA* key, std::string* auth_key) {
505 bssl::UniquePtr<EVP_PKEY> evp_pkey(EVP_PKEY_new());
506 if (!EVP_PKEY_set1_RSA(evp_pkey.get(), key)) {
507 LOG(ERROR) << "EVP_PKEY_set1_RSA failed";
508 return false;
509 }
510 auto x509 = GenerateX509Certificate(evp_pkey.get());
511 auto x509_str = X509ToPEMString(x509.get());
512 auto evp_str = Key::ToPEMString(evp_pkey.get());
513
514 int osh = cast_handle_to_int(adb_get_os_handle(fd_));
515 #if ADB_HOST
516 tls_ = TlsConnection::Create(TlsConnection::Role::Client, x509_str, evp_str, osh);
517 #else
518 tls_ = TlsConnection::Create(TlsConnection::Role::Server, x509_str, evp_str, osh);
519 #endif
520 CHECK(tls_);
521 #if ADB_HOST
522 // TLS 1.3 gives the client no message if the server rejected the
523 // certificate. This will enable a check in the tls connection to check
524 // whether the client certificate got rejected. Note that this assumes
525 // that, on handshake success, the server speaks first.
526 tls_->EnableClientPostHandshakeCheck(true);
527 // Add callback to set the certificate when server issues the
528 // CertificateRequest.
529 tls_->SetCertificateCallback(adb_tls_set_certificate);
530 // Allow any server certificate
531 tls_->SetCertVerifyCallback([](X509_STORE_CTX*) { return 1; });
532 #else
533 // Add callback to check certificate against a list of known public keys
534 tls_->SetCertVerifyCallback(
535 [auth_key](X509_STORE_CTX* ctx) { return adbd_tls_verify_cert(ctx, auth_key); });
536 // Add the list of allowed client CA issuers
537 auto ca_list = adbd_tls_client_ca_list();
538 tls_->SetClientCAList(ca_list.get());
539 #endif
540
541 auto err = tls_->DoHandshake();
542 if (err == TlsError::Success) {
543 return true;
544 }
545
546 tls_.reset();
547 return false;
548 }
549
Close()550 void FdConnection::Close() {
551 adb_shutdown(fd_.get());
552 fd_.reset();
553 }
554
send_packet(apacket * p,atransport * t)555 void send_packet(apacket* p, atransport* t) {
556 p->msg.magic = p->msg.command ^ 0xffffffff;
557 // compute a checksum for connection/auth packets for compatibility reasons
558 if (t->get_protocol_version() >= A_VERSION_SKIP_CHECKSUM) {
559 p->msg.data_check = 0;
560 } else {
561 p->msg.data_check = calculate_apacket_checksum(p);
562 }
563
564 VLOG(TRANSPORT) << dump_packet(t->serial.c_str(), "to remote", p);
565
566 if (t == nullptr) {
567 LOG(FATAL) << "Transport is null";
568 }
569
570 if (t->Write(p) != 0) {
571 D("%s: failed to enqueue packet, closing transport", t->serial.c_str());
572 t->Kick();
573 }
574 }
575
kick_transport(atransport * t,bool reset)576 void kick_transport(atransport* t, bool reset) {
577 std::lock_guard<std::recursive_mutex> lock(transport_lock);
578 // As kick_transport() can be called from threads without guarantee that t is valid,
579 // check if the transport is in transport_list first.
580 //
581 // TODO(jmgao): WTF? Is this actually true?
582 if (std::find(transport_list.begin(), transport_list.end(), t) != transport_list.end()) {
583 if (reset) {
584 t->Reset();
585 } else {
586 t->Kick();
587 }
588 }
589
590 #if ADB_HOST
591 reconnect_handler.CheckForKicked();
592 #endif
593 }
594
595 static int transport_registration_send = -1;
596 static int transport_registration_recv = -1;
597 static fdevent* transport_registration_fde;
598
599 #if ADB_HOST
600
601 /* this adds support required by the 'track-devices' service.
602 * this is used to send the content of "list_transport" to any
603 * number of client connections that want it through a single
604 * live TCP connection
605 */
606 struct device_tracker {
607 asocket socket;
608 bool update_needed = false;
609 bool long_output = false;
610 device_tracker* next = nullptr;
611 };
612
613 /* linked list of all device trackers */
614 static device_tracker* device_tracker_list;
615
device_tracker_remove(device_tracker * tracker)616 static void device_tracker_remove(device_tracker* tracker) {
617 device_tracker** pnode = &device_tracker_list;
618 device_tracker* node = *pnode;
619
620 std::lock_guard<std::recursive_mutex> lock(transport_lock);
621 while (node) {
622 if (node == tracker) {
623 *pnode = node->next;
624 break;
625 }
626 pnode = &node->next;
627 node = *pnode;
628 }
629 }
630
device_tracker_close(asocket * socket)631 static void device_tracker_close(asocket* socket) {
632 device_tracker* tracker = (device_tracker*)socket;
633 asocket* peer = socket->peer;
634
635 D("device tracker %p removed", tracker);
636 if (peer) {
637 peer->peer = nullptr;
638 peer->close(peer);
639 }
640 device_tracker_remove(tracker);
641 delete tracker;
642 }
643
device_tracker_enqueue(asocket * socket,apacket::payload_type)644 static int device_tracker_enqueue(asocket* socket, apacket::payload_type) {
645 /* you can't read from a device tracker, close immediately */
646 device_tracker_close(socket);
647 return -1;
648 }
649
device_tracker_send(device_tracker * tracker,const std::string & string)650 static int device_tracker_send(device_tracker* tracker, const std::string& string) {
651 asocket* peer = tracker->socket.peer;
652
653 apacket::payload_type data;
654 data.resize(4 + string.size());
655 char buf[5];
656 snprintf(buf, sizeof(buf), "%04x", static_cast<int>(string.size()));
657 memcpy(&data[0], buf, 4);
658 memcpy(&data[4], string.data(), string.size());
659 return peer->enqueue(peer, std::move(data));
660 }
661
device_tracker_ready(asocket * socket)662 static void device_tracker_ready(asocket* socket) {
663 device_tracker* tracker = reinterpret_cast<device_tracker*>(socket);
664
665 // We want to send the device list when the tracker connects
666 // for the first time, even if no update occurred.
667 if (tracker->update_needed) {
668 tracker->update_needed = false;
669 device_tracker_send(tracker, list_transports(tracker->long_output));
670 }
671 }
672
create_device_tracker(bool long_output)673 asocket* create_device_tracker(bool long_output) {
674 device_tracker* tracker = new device_tracker();
675 if (tracker == nullptr) LOG(FATAL) << "cannot allocate device tracker";
676
677 D("device tracker %p created", tracker);
678
679 tracker->socket.enqueue = device_tracker_enqueue;
680 tracker->socket.ready = device_tracker_ready;
681 tracker->socket.close = device_tracker_close;
682 tracker->update_needed = true;
683 tracker->long_output = long_output;
684
685 tracker->next = device_tracker_list;
686 device_tracker_list = tracker;
687
688 return &tracker->socket;
689 }
690
691 // Check if all of the USB transports are connected.
iterate_transports(std::function<bool (const atransport *)> fn)692 bool iterate_transports(std::function<bool(const atransport*)> fn) {
693 std::lock_guard<std::recursive_mutex> lock(transport_lock);
694 for (const auto& t : transport_list) {
695 if (!fn(t)) {
696 return false;
697 }
698 }
699 for (const auto& t : pending_list) {
700 if (!fn(t)) {
701 return false;
702 }
703 }
704 return true;
705 }
706
707 // Call this function each time the transport list has changed.
update_transports()708 void update_transports() {
709 update_transport_status();
710
711 // Notify `adb track-devices` clients.
712 device_tracker* tracker = device_tracker_list;
713 while (tracker != nullptr) {
714 device_tracker* next = tracker->next;
715 // This may destroy the tracker if the connection is closed.
716 device_tracker_send(tracker, list_transports(tracker->long_output));
717 tracker = next;
718 }
719 }
720
721 #else
722
update_transports()723 void update_transports() {
724 // Nothing to do on the device side.
725 }
726
727 #endif // ADB_HOST
728
729 // The transport listeners communicate with the transports list via fdevent. tmsg structure is a
730 // container used as message unit and written to a pipe in order to communicate the transport
731 // (pointer) and the action to perform.
732 //
733 // Transport listener FDEVENT Transports list
734 // --------------------------------------------------------------
735 // (&transport,action) -> tmsg -> (&transport, action)
736 //
737 // TODO: Figure out if this fdevent bridge really is necessary? With the re-entrant lock to sync
738 // access, what prevents us from "simply" updating the transport_list directly?
739
740 struct tmsg {
741 atransport* transport;
742
743 enum struct Action : int {
744 UNREGISTER = 0, // Unregister the transport from transport list (typically a device has
745 // been unplugged from USB or disconnected from TCP.
746 REGISTER = 1, // Register the transport to the transport list (typically a device has been
747 // plugged via USB or connected via TCP.
748 };
749 Action action;
750 };
751
transport_read_action(int fd,struct tmsg * m)752 static int transport_read_action(int fd, struct tmsg* m) {
753 char* p = (char*)m;
754 int len = sizeof(*m);
755 int r;
756
757 while (len > 0) {
758 r = adb_read(fd, p, len);
759 if (r > 0) {
760 len -= r;
761 p += r;
762 } else {
763 D("transport_read_action: on fd %d: %s", fd, strerror(errno));
764 return -1;
765 }
766 }
767 return 0;
768 }
769
transport_write_action(int fd,struct tmsg * m)770 static int transport_write_action(int fd, struct tmsg* m) {
771 char* p = (char*)m;
772 int len = sizeof(*m);
773 int r;
774
775 while (len > 0) {
776 r = adb_write(fd, p, len);
777 if (r > 0) {
778 len -= r;
779 p += r;
780 } else {
781 D("transport_write_action: on fd %d: %s", fd, strerror(errno));
782 return -1;
783 }
784 }
785 return 0;
786 }
787
usb_devices_start_detached()788 static bool usb_devices_start_detached() {
789 #if ADB_HOST
790 static const char* env = getenv("ADB_LIBUSB_START_DETACHED");
791 static bool result = env && strcmp("1", env) == 0;
792 return should_use_libusb() && result;
793 #else
794 return false;
795 #endif
796 }
797
transport_registration_func(int _fd,unsigned ev,void *)798 static void transport_registration_func(int _fd, unsigned ev, void*) {
799 tmsg m;
800 atransport* t;
801
802 if (!(ev & FDE_READ)) {
803 return;
804 }
805
806 if (transport_read_action(_fd, &m)) {
807 PLOG(FATAL) << "cannot read transport registration socket";
808 }
809
810 t = m.transport;
811
812 if (m.action == tmsg::Action::UNREGISTER) {
813 D("transport: %s deleting", t->serial.c_str());
814
815 {
816 std::lock_guard<std::recursive_mutex> lock(transport_lock);
817 transport_list.remove(t);
818 }
819
820 delete t;
821
822 update_transports();
823 return;
824 }
825
826 /* don't create transport threads for inaccessible devices */
827 if (t->GetConnectionState() != kCsNoPerm) {
828 t->connection()->SetTransport(t);
829
830 if (t->type == kTransportUsb && usb_devices_start_detached()) {
831 t->SetConnectionState(kCsDetached);
832 } else {
833 t->connection()->Start();
834 #if ADB_HOST
835 send_connect(t);
836 #endif
837 }
838 }
839
840 {
841 std::lock_guard<std::recursive_mutex> lock(transport_lock);
842 auto it = std::find(pending_list.begin(), pending_list.end(), t);
843 if (it != pending_list.end()) {
844 pending_list.remove(t);
845 transport_list.push_front(t);
846 }
847 }
848
849 update_transports();
850 }
851
852 #if ADB_HOST
init_reconnect_handler(void)853 void init_reconnect_handler(void) {
854 reconnect_handler.Start();
855 }
856 #endif
857
init_transport_registration(void)858 void init_transport_registration(void) {
859 int s[2];
860
861 if (adb_socketpair(s)) {
862 PLOG(FATAL) << "cannot open transport registration socketpair";
863 }
864 D("socketpair: (%d,%d)", s[0], s[1]);
865
866 transport_registration_send = s[0];
867 transport_registration_recv = s[1];
868
869 transport_registration_fde =
870 fdevent_create(transport_registration_recv, transport_registration_func, nullptr);
871 fdevent_set(transport_registration_fde, FDE_READ);
872 }
873
kick_all_transports()874 void kick_all_transports() {
875 #if ADB_HOST
876 reconnect_handler.Stop();
877 #endif
878 // To avoid only writing part of a packet to a transport after exit, kick all transports.
879 std::lock_guard<std::recursive_mutex> lock(transport_lock);
880 for (auto t : transport_list) {
881 t->Kick();
882 }
883 }
884
kick_all_tcp_tls_transports()885 void kick_all_tcp_tls_transports() {
886 std::lock_guard<std::recursive_mutex> lock(transport_lock);
887 for (auto t : transport_list) {
888 if (t->IsTcpDevice() && t->use_tls) {
889 t->Kick();
890 }
891 }
892 }
893
894 #if !ADB_HOST
kick_all_transports_by_auth_key(std::string_view auth_key)895 void kick_all_transports_by_auth_key(std::string_view auth_key) {
896 std::lock_guard<std::recursive_mutex> lock(transport_lock);
897 for (auto t : transport_list) {
898 if (auth_key == t->auth_key) {
899 t->Kick();
900 }
901 }
902 }
903 #endif
904
905 /* the fdevent select pump is single threaded */
register_transport(atransport * transport)906 void register_transport(atransport* transport) {
907 tmsg m;
908 m.transport = transport;
909 m.action = tmsg::Action::REGISTER;
910 D("transport: %s registered", transport->serial.c_str());
911 if (transport_write_action(transport_registration_send, &m)) {
912 PLOG(FATAL) << "cannot write transport registration socket";
913 }
914 }
915
remove_transport(atransport * transport)916 static void remove_transport(atransport* transport) {
917 tmsg m;
918 m.transport = transport;
919 m.action = tmsg::Action::UNREGISTER;
920 D("transport: %s removed", transport->serial.c_str());
921 if (transport_write_action(transport_registration_send, &m)) {
922 PLOG(FATAL) << "cannot write transport registration socket";
923 }
924 }
925
transport_destroy(atransport * t)926 static void transport_destroy(atransport* t) {
927 fdevent_check_looper();
928 CHECK(t != nullptr);
929
930 std::lock_guard<std::recursive_mutex> lock(transport_lock);
931 LOG(INFO) << "destroying transport " << t->serial_name();
932 t->connection()->Stop();
933 #if ADB_HOST
934 if (t->IsTcpDevice() && !t->kicked()) {
935 D("transport: %s destroy (attempting reconnection)", t->serial.c_str());
936
937 // We need to clear the transport's keys, so that on the next connection, it tries
938 // again from the beginning.
939 t->ResetKeys();
940 reconnect_handler.TrackTransport(t);
941 return;
942 }
943 #endif
944
945 D("transport: %s destroy (kicking and closing)", t->serial.c_str());
946 remove_transport(t);
947 }
948
949 #if ADB_HOST
qual_match(const std::string & to_test,const char * prefix,const std::string & qual,bool sanitize_qual)950 static int qual_match(const std::string& to_test, const char* prefix, const std::string& qual,
951 bool sanitize_qual) {
952 if (to_test.empty()) /* Return true if both the qual and to_test are empty strings. */
953 return qual.empty();
954
955 if (qual.empty()) return 0;
956
957 const char* ptr = to_test.c_str();
958 if (prefix) {
959 while (*prefix) {
960 if (*prefix++ != *ptr++) return 0;
961 }
962 }
963
964 for (char ch : qual) {
965 if (sanitize_qual && !isalnum(ch)) ch = '_';
966 if (ch != *ptr++) return 0;
967 }
968
969 /* Everything matched so far. Return true if *ptr is a NUL. */
970 return !*ptr;
971 }
972
973 // Contains either a device serial string or a USB device address like "usb:2-6"
974 const char* __transport_server_one_device = nullptr;
975
transport_set_one_device(const char * adb_one_device)976 void transport_set_one_device(const char* adb_one_device) {
977 __transport_server_one_device = adb_one_device;
978 }
979
transport_get_one_device()980 const char* transport_get_one_device() {
981 return __transport_server_one_device;
982 }
983
transport_server_owns_device(std::string_view serial)984 bool transport_server_owns_device(std::string_view serial) {
985 if (!__transport_server_one_device) {
986 // If the server doesn't own one device, server owns all devices.
987 return true;
988 }
989 return serial.compare(__transport_server_one_device) == 0;
990 }
991
transport_server_owns_device(std::string_view dev_path,std::string_view serial)992 bool transport_server_owns_device(std::string_view dev_path, std::string_view serial) {
993 if (!__transport_server_one_device) {
994 // If the server doesn't own one device, server owns all devices.
995 return true;
996 }
997 return serial.compare(__transport_server_one_device) == 0 ||
998 dev_path.compare(__transport_server_one_device) == 0;
999 }
1000
acquire_one_transport(TransportType type,const char * serial,TransportId transport_id,bool * is_ambiguous,std::string * error_out,bool accept_any_state)1001 atransport* acquire_one_transport(TransportType type, const char* serial, TransportId transport_id,
1002 bool* is_ambiguous, std::string* error_out,
1003 bool accept_any_state) {
1004 atransport* result = nullptr;
1005
1006 if (transport_id != 0) {
1007 *error_out = android::base::StringPrintf("no device with transport id '%" PRIu64 "'",
1008 transport_id);
1009 } else if (serial) {
1010 *error_out = android::base::StringPrintf("device '%s' not found", serial);
1011 } else if (type == kTransportLocal) {
1012 *error_out = "no emulators found";
1013 } else if (type == kTransportAny) {
1014 *error_out = "no devices/emulators found";
1015 } else {
1016 *error_out = "no devices found";
1017 }
1018
1019 std::unique_lock<std::recursive_mutex> lock(transport_lock);
1020 for (const auto& t : transport_list) {
1021 if (t->GetConnectionState() == kCsNoPerm) {
1022 *error_out = UsbNoPermissionsLongHelpText();
1023 continue;
1024 }
1025
1026 if (transport_id) {
1027 if (t->id == transport_id) {
1028 result = t;
1029 break;
1030 }
1031 } else if (serial) {
1032 if (t->MatchesTarget(serial)) {
1033 if (result) {
1034 *error_out = "more than one device with serial "s + serial;
1035 if (is_ambiguous) *is_ambiguous = true;
1036 result = nullptr;
1037 break;
1038 }
1039 result = t;
1040 }
1041 } else {
1042 if (type == kTransportUsb && t->type == kTransportUsb) {
1043 if (result) {
1044 *error_out = "more than one USB device";
1045 if (is_ambiguous) *is_ambiguous = true;
1046 result = nullptr;
1047 break;
1048 }
1049 result = t;
1050 } else if (type == kTransportLocal && t->type == kTransportLocal) {
1051 if (result) {
1052 *error_out = "more than one emulator";
1053 if (is_ambiguous) *is_ambiguous = true;
1054 result = nullptr;
1055 break;
1056 }
1057 result = t;
1058 } else if (type == kTransportAny) {
1059 if (result) {
1060 *error_out = "more than one device/emulator";
1061 if (is_ambiguous) *is_ambiguous = true;
1062 result = nullptr;
1063 break;
1064 }
1065 result = t;
1066 }
1067 }
1068 }
1069 lock.unlock();
1070
1071 if (result && !accept_any_state) {
1072 // The caller requires an active transport.
1073 // Make sure that we're actually connected.
1074 ConnectionState state = result->GetConnectionState();
1075 switch (state) {
1076 case kCsConnecting:
1077 *error_out = "device still connecting";
1078 result = nullptr;
1079 break;
1080
1081 case kCsAuthorizing:
1082 *error_out = "device still authorizing";
1083 result = nullptr;
1084 break;
1085
1086 case kCsUnauthorized: {
1087 *error_out = "device unauthorized.\n";
1088 char* ADB_VENDOR_KEYS = getenv("ADB_VENDOR_KEYS");
1089 *error_out += "This adb server's $ADB_VENDOR_KEYS is ";
1090 *error_out += ADB_VENDOR_KEYS ? ADB_VENDOR_KEYS : "not set";
1091 *error_out += "\n";
1092 *error_out += "Try 'adb kill-server' if that seems wrong.\n";
1093 *error_out += "Otherwise check for a confirmation dialog on your device.";
1094 result = nullptr;
1095 break;
1096 }
1097
1098 case kCsOffline:
1099 *error_out = "device offline";
1100 result = nullptr;
1101 break;
1102
1103 default:
1104 break;
1105 }
1106 }
1107
1108 if (result) {
1109 *error_out = "success";
1110 }
1111
1112 return result;
1113 }
1114
WaitForConnection(std::chrono::milliseconds timeout)1115 bool ConnectionWaitable::WaitForConnection(std::chrono::milliseconds timeout) {
1116 std::unique_lock<std::mutex> lock(mutex_);
1117 ScopedLockAssertion assume_locked(mutex_);
1118 return cv_.wait_for(lock, timeout, [&]() REQUIRES(mutex_) {
1119 return connection_established_ready_;
1120 }) && connection_established_;
1121 }
1122
SetConnectionEstablished(bool success)1123 void ConnectionWaitable::SetConnectionEstablished(bool success) {
1124 {
1125 std::lock_guard<std::mutex> lock(mutex_);
1126 if (connection_established_ready_) return;
1127 connection_established_ready_ = true;
1128 connection_established_ = success;
1129 D("connection established with %d", success);
1130 }
1131 cv_.notify_one();
1132 }
1133 #endif
1134
~atransport()1135 atransport::~atransport() {
1136 #if ADB_HOST
1137 // If the connection callback had not been run before, run it now.
1138 SetConnectionEstablished(false);
1139 #endif
1140 }
1141
Write(apacket * p)1142 int atransport::Write(apacket* p) {
1143 return this->connection()->Write(std::unique_ptr<apacket>(p)) ? 0 : -1;
1144 }
1145
Reset()1146 void atransport::Reset() {
1147 if (!kicked_.exchange(true)) {
1148 LOG(INFO) << "resetting transport " << this << " " << this->serial;
1149 this->connection()->Reset();
1150 }
1151 }
1152
Kick()1153 void atransport::Kick() {
1154 if (!kicked_.exchange(true)) {
1155 LOG(INFO) << "kicking transport " << this << " " << this->serial;
1156 this->connection()->Stop();
1157 }
1158 }
1159
GetConnectionState() const1160 ConnectionState atransport::GetConnectionState() const {
1161 return connection_state_;
1162 }
1163
SetConnectionState(ConnectionState state)1164 void atransport::SetConnectionState(ConnectionState state) {
1165 fdevent_check_looper();
1166 connection_state_ = state;
1167 update_transports();
1168 }
1169
1170 #if ADB_HOST
Attach(std::string * error)1171 bool atransport::Attach(std::string* error) {
1172 D("%s: attach", serial.c_str());
1173 fdevent_check_looper();
1174
1175 if (!should_use_libusb()) {
1176 *error = "attach/detach only implemented for libusb backend";
1177 return false;
1178 }
1179
1180 if (GetConnectionState() != ConnectionState::kCsDetached) {
1181 *error = android::base::StringPrintf("transport %s is not detached", serial.c_str());
1182 return false;
1183 }
1184
1185 ResetKeys();
1186
1187 {
1188 std::lock_guard<std::mutex> lock(mutex_);
1189 if (!connection_->Attach(error)) {
1190 return false;
1191 }
1192 }
1193
1194 send_connect(this);
1195 return true;
1196 }
1197
Detach(std::string * error)1198 bool atransport::Detach(std::string* error) {
1199 D("%s: detach", serial.c_str());
1200 fdevent_check_looper();
1201
1202 if (!should_use_libusb()) {
1203 *error = "attach/detach only implemented for libusb backend";
1204 return false;
1205 }
1206
1207 if (GetConnectionState() == ConnectionState::kCsDetached) {
1208 *error = android::base::StringPrintf("transport %s is already detached", serial.c_str());
1209 return false;
1210 }
1211
1212 handle_offline(this);
1213
1214 {
1215 std::lock_guard<std::mutex> lock(mutex_);
1216 if (!connection_->Detach(error)) {
1217 return false;
1218 }
1219 }
1220
1221 this->SetConnectionState(kCsDetached);
1222 return true;
1223 }
1224 #endif
1225
SetConnection(std::shared_ptr<Connection> connection)1226 void atransport::SetConnection(std::shared_ptr<Connection> connection) {
1227 std::lock_guard<std::mutex> lock(mutex_);
1228 connection_ = std::shared_ptr<Connection>(std::move(connection));
1229 }
1230
HandleRead(std::unique_ptr<apacket> p)1231 bool atransport::HandleRead(std::unique_ptr<apacket> p) {
1232 if (!check_header(p.get(), this)) {
1233 D("%s: remote read: bad header", serial.c_str());
1234 return false;
1235 }
1236
1237 VLOG(TRANSPORT) << dump_packet(serial.c_str(), "from remote", p.get());
1238 apacket* packet = p.release();
1239
1240 // This needs to run on the looper thread since the associated fdevent
1241 // message pump exists in that context.
1242 fdevent_run_on_looper([packet, this]() { handle_packet(packet, this); });
1243
1244 return true;
1245 }
1246
HandleError(const std::string & error)1247 void atransport::HandleError(const std::string& error) {
1248 LOG(INFO) << serial_name() << ": connection terminated: " << error;
1249 fdevent_run_on_looper([this]() {
1250 handle_offline(this);
1251 transport_destroy(this);
1252 });
1253 }
1254
update_version(int version,size_t payload)1255 void atransport::update_version(int version, size_t payload) {
1256 protocol_version = std::min(version, A_VERSION);
1257 max_payload = std::min(payload, MAX_PAYLOAD);
1258 }
1259
get_protocol_version() const1260 int atransport::get_protocol_version() const {
1261 return protocol_version;
1262 }
1263
get_tls_version() const1264 int atransport::get_tls_version() const {
1265 return tls_version;
1266 }
1267
get_max_payload() const1268 size_t atransport::get_max_payload() const {
1269 return max_payload;
1270 }
1271
1272 #if ADB_HOST
delayed_ack_enabled()1273 static bool delayed_ack_enabled() {
1274 static const char* env = getenv("ADB_DELAYED_ACK");
1275 static bool result = env && strcmp(env, "1") == 0;
1276 return result;
1277 }
1278 #endif
1279
supported_features()1280 const FeatureSet& supported_features() {
1281 static const android::base::NoDestructor<FeatureSet> features([]() {
1282 // Increment ADB_SERVER_VERSION when adding a feature that adbd needs
1283 // to know about. Otherwise, the client can be stuck running an old
1284 // version of the server even after upgrading their copy of adb.
1285 // (http://b/24370690)
1286
1287 // clang-format off
1288 FeatureSet result {
1289 kFeatureShell2,
1290 kFeatureCmd,
1291 kFeatureStat2,
1292 kFeatureLs2,
1293 kFeatureFixedPushMkdir,
1294 kFeatureApex,
1295 kFeatureAbb,
1296 kFeatureFixedPushSymlinkTimestamp,
1297 kFeatureAbbExec,
1298 kFeatureRemountShell,
1299 kFeatureTrackApp,
1300 kFeatureSendRecv2,
1301 kFeatureSendRecv2Brotli,
1302 kFeatureSendRecv2LZ4,
1303 kFeatureSendRecv2Zstd,
1304 kFeatureSendRecv2DryRunSend,
1305 kFeatureOpenscreenMdns,
1306 };
1307 // clang-format on
1308
1309 #if ADB_HOST
1310 if (delayed_ack_enabled()) {
1311 result.push_back(kFeatureDelayedAck);
1312 }
1313 #else
1314 result.push_back(kFeatureDelayedAck);
1315 #endif
1316 return result;
1317 }());
1318
1319 return *features;
1320 }
1321
FeatureSetToString(const FeatureSet & features)1322 std::string FeatureSetToString(const FeatureSet& features) {
1323 return android::base::Join(features, ',');
1324 }
1325
StringToFeatureSet(const std::string & features_string)1326 FeatureSet StringToFeatureSet(const std::string& features_string) {
1327 if (features_string.empty()) {
1328 return FeatureSet();
1329 }
1330
1331 return android::base::Split(features_string, ",");
1332 }
1333
1334 template <class Range, class Value>
contains(const Range & r,const Value & v)1335 static bool contains(const Range& r, const Value& v) {
1336 return std::find(std::begin(r), std::end(r), v) != std::end(r);
1337 }
1338
CanUseFeature(const FeatureSet & feature_set,const std::string & feature)1339 bool CanUseFeature(const FeatureSet& feature_set, const std::string& feature) {
1340 return contains(feature_set, feature) && contains(supported_features(), feature);
1341 }
1342
has_feature(const std::string & feature) const1343 bool atransport::has_feature(const std::string& feature) const {
1344 return contains(features_, feature);
1345 }
1346
SetFeatures(const std::string & features_string)1347 void atransport::SetFeatures(const std::string& features_string) {
1348 features_ = StringToFeatureSet(features_string);
1349 delayed_ack_ = CanUseFeature(features_, kFeatureDelayedAck);
1350 }
1351
AddDisconnect(adisconnect * disconnect)1352 void atransport::AddDisconnect(adisconnect* disconnect) {
1353 disconnects_.push_back(disconnect);
1354 }
1355
RemoveDisconnect(adisconnect * disconnect)1356 void atransport::RemoveDisconnect(adisconnect* disconnect) {
1357 disconnects_.remove(disconnect);
1358 }
1359
RunDisconnects()1360 void atransport::RunDisconnects() {
1361 for (const auto& disconnect : disconnects_) {
1362 disconnect->func(disconnect->opaque, this);
1363 }
1364 disconnects_.clear();
1365 }
1366
1367 #if ADB_HOST
MatchesTarget(const std::string & target) const1368 bool atransport::MatchesTarget(const std::string& target) const {
1369 if (!serial.empty()) {
1370 if (target == serial) {
1371 return true;
1372 } else if (type == kTransportLocal) {
1373 // Local transports can match [tcp:|udp:]<hostname>[:port].
1374 const char* local_target_ptr = target.c_str();
1375
1376 // For fastboot compatibility, ignore protocol prefixes.
1377 if (android::base::StartsWith(target, "tcp:") ||
1378 android::base::StartsWith(target, "udp:")) {
1379 local_target_ptr += 4;
1380 }
1381
1382 // Parse our |serial| and the given |target| to check if the hostnames and ports match.
1383 std::string serial_host, error;
1384 int serial_port = -1;
1385 if (android::base::ParseNetAddress(serial, &serial_host, &serial_port, nullptr,
1386 &error)) {
1387 // |target| may omit the port to default to ours.
1388 std::string target_host;
1389 int target_port = serial_port;
1390 if (android::base::ParseNetAddress(local_target_ptr, &target_host, &target_port,
1391 nullptr, &error) &&
1392 serial_host == target_host && serial_port == target_port) {
1393 return true;
1394 }
1395 }
1396 }
1397 }
1398
1399 return (target == devpath) || qual_match(target, "product:", product, false) ||
1400 qual_match(target, "model:", model, true) ||
1401 qual_match(target, "device:", device, false);
1402 }
1403
SetConnectionEstablished(bool success)1404 void atransport::SetConnectionEstablished(bool success) {
1405 connection_waitable_->SetConnectionEstablished(success);
1406 }
1407
Reconnect()1408 ReconnectResult atransport::Reconnect() {
1409 return reconnect_(this);
1410 }
1411
1412 // We use newline as our delimiter, make sure to never output it.
sanitize(std::string str,bool alphanumeric)1413 static std::string sanitize(std::string str, bool alphanumeric) {
1414 auto pred = alphanumeric ? [](const char c) { return !isalnum(c); }
1415 : [](const char c) { return c == '\n'; };
1416 std::replace_if(str.begin(), str.end(), pred, '_');
1417 return str;
1418 }
1419
append_transport_info(std::string * result,const char * key,const std::string & value,bool alphanumeric)1420 static void append_transport_info(std::string* result, const char* key, const std::string& value,
1421 bool alphanumeric) {
1422 if (value.empty()) {
1423 return;
1424 }
1425
1426 *result += ' ';
1427 *result += key;
1428 *result += sanitize(value, alphanumeric);
1429 }
1430
append_transport(const atransport * t,std::string * result,bool long_listing)1431 static void append_transport(const atransport* t, std::string* result, bool long_listing) {
1432 std::string serial = t->serial;
1433 if (serial.empty()) {
1434 serial = "(no serial number)";
1435 }
1436
1437 if (!long_listing) {
1438 *result += serial;
1439 *result += '\t';
1440 *result += to_string(t->GetConnectionState());
1441 } else {
1442 android::base::StringAppendF(result, "%-22s %s", serial.c_str(),
1443 to_string(t->GetConnectionState()).c_str());
1444
1445 append_transport_info(result, "", t->devpath, false);
1446 append_transport_info(result, "product:", t->product, false);
1447 append_transport_info(result, "model:", t->model, true);
1448 append_transport_info(result, "device:", t->device, false);
1449
1450 // Put id at the end, so that anyone parsing the output here can always find it by scanning
1451 // backwards from newlines, even with hypothetical devices named 'transport_id:1'.
1452 *result += " transport_id:";
1453 *result += std::to_string(t->id);
1454 }
1455 *result += '\n';
1456 }
1457
list_transports(bool long_listing)1458 std::string list_transports(bool long_listing) {
1459 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1460
1461 auto sorted_transport_list = transport_list;
1462 sorted_transport_list.sort([](atransport*& x, atransport*& y) {
1463 if (x->type != y->type) {
1464 return x->type < y->type;
1465 }
1466 return x->serial < y->serial;
1467 });
1468
1469 std::string result;
1470 for (const auto& t : sorted_transport_list) {
1471 append_transport(t, &result, long_listing);
1472 }
1473 return result;
1474 }
1475
close_usb_devices(std::function<bool (const atransport *)> predicate,bool reset)1476 void close_usb_devices(std::function<bool(const atransport*)> predicate, bool reset) {
1477 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1478 for (auto& t : transport_list) {
1479 if (predicate(t)) {
1480 if (reset) {
1481 t->Reset();
1482 } else {
1483 t->Kick();
1484 }
1485 }
1486 }
1487 }
1488
1489 /* hack for osx */
close_usb_devices(bool reset)1490 void close_usb_devices(bool reset) {
1491 close_usb_devices([](const atransport*) { return true; }, reset);
1492 }
1493 #endif
1494
validate_transport_list(const std::list<atransport * > & list,const std::string & serial,atransport * t,int * error)1495 bool validate_transport_list(const std::list<atransport*>& list, const std::string& serial,
1496 atransport* t, int* error) {
1497 for (const auto& transport : list) {
1498 if (serial == transport->serial) {
1499 const std::string list_name(&list == &pending_list ? "pending" : "transport");
1500 VLOG(TRANSPORT) << "socket transport " << transport->serial << " is already in the "
1501 << list_name << " list and fails to register";
1502 delete t;
1503 if (error) *error = EALREADY;
1504 return false;
1505 }
1506 }
1507 return true;
1508 }
1509
register_socket_transport(unique_fd s,std::string serial,int port,int local,atransport::ReconnectCallback reconnect,bool use_tls,int * error)1510 bool register_socket_transport(unique_fd s, std::string serial, int port, int local,
1511 atransport::ReconnectCallback reconnect, bool use_tls, int* error) {
1512 atransport* t = new atransport(std::move(reconnect), kCsOffline);
1513 t->use_tls = use_tls;
1514 t->serial = std::move(serial);
1515
1516 D("transport: %s init'ing for socket %d, on port %d", t->serial.c_str(), s.get(), port);
1517 if (init_socket_transport(t, std::move(s), port, local) < 0) {
1518 delete t;
1519 if (error) *error = errno;
1520 return false;
1521 }
1522
1523 std::unique_lock<std::recursive_mutex> lock(transport_lock);
1524 if (!validate_transport_list(pending_list, t->serial, t, error)) {
1525 return false;
1526 }
1527
1528 if (!validate_transport_list(transport_list, t->serial, t, error)) {
1529 return false;
1530 }
1531
1532 pending_list.push_front(t);
1533
1534 lock.unlock();
1535
1536 #if ADB_HOST
1537 auto waitable = t->connection_waitable();
1538 #endif
1539 register_transport(t);
1540
1541 if (local == 1) {
1542 // Do not wait for emulator transports.
1543 return true;
1544 }
1545
1546 #if ADB_HOST
1547 if (!waitable->WaitForConnection(std::chrono::seconds(10))) {
1548 if (error) *error = ETIMEDOUT;
1549 return false;
1550 }
1551
1552 if (t->GetConnectionState() == kCsUnauthorized) {
1553 if (error) *error = EPERM;
1554 return false;
1555 }
1556 #endif
1557
1558 return true;
1559 }
1560
1561 #if ADB_HOST
find_transport(const char * serial)1562 atransport* find_transport(const char* serial) {
1563 atransport* result = nullptr;
1564
1565 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1566 for (auto& t : transport_list) {
1567 if (strcmp(serial, t->serial.c_str()) == 0) {
1568 result = t;
1569 break;
1570 }
1571 }
1572
1573 return result;
1574 }
1575
kick_all_tcp_devices()1576 void kick_all_tcp_devices() {
1577 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1578 for (auto& t : transport_list) {
1579 if (t->IsTcpDevice()) {
1580 // Kicking breaks the read_transport thread of this transport out of any read, then
1581 // the read_transport thread will notify the main thread to make this transport
1582 // offline. Then the main thread will notify the write_transport thread to exit.
1583 // Finally, this transport will be closed and freed in the main thread.
1584 t->Kick();
1585 }
1586 }
1587 reconnect_handler.CheckForKicked();
1588 }
1589
register_usb_transport(std::shared_ptr<Connection> connection,const char * serial,const char * devpath,unsigned writeable)1590 void register_usb_transport(std::shared_ptr<Connection> connection, const char* serial,
1591 const char* devpath, unsigned writeable) {
1592 atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm);
1593 if (serial) {
1594 t->serial = serial;
1595 }
1596 if (devpath) {
1597 t->devpath = devpath;
1598 }
1599
1600 t->SetConnection(std::move(connection));
1601 t->type = kTransportUsb;
1602
1603 {
1604 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1605 pending_list.push_front(t);
1606 }
1607
1608 register_transport(t);
1609 }
1610
register_usb_transport(usb_handle * usb,const char * serial,const char * devpath,unsigned writeable)1611 void register_usb_transport(usb_handle* usb, const char* serial, const char* devpath,
1612 unsigned writeable) {
1613 atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm);
1614
1615 D("transport: %p init'ing for usb_handle %p (sn='%s')", t, usb, serial ? serial : "");
1616 init_usb_transport(t, usb);
1617 if (serial) {
1618 t->serial = serial;
1619 }
1620
1621 if (devpath) {
1622 t->devpath = devpath;
1623 }
1624
1625 {
1626 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1627 pending_list.push_front(t);
1628 }
1629
1630 register_transport(t);
1631 }
1632
1633 // This should only be used for transports with connection_state == kCsNoPerm.
unregister_usb_transport(usb_handle * usb)1634 void unregister_usb_transport(usb_handle* usb) {
1635 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1636 transport_list.remove_if([usb](atransport* t) {
1637 return t->GetUsbHandle() == usb && t->GetConnectionState() == kCsNoPerm;
1638 });
1639 }
1640
1641 // Track reverse:forward commands, so that info can be used to develop
1642 // an 'allow-list':
1643 // - adb reverse tcp:<device_port> localhost:<host_port> : responds with the
1644 // device_port
1645 // - adb reverse --remove tcp:<device_port> : responds OKAY
1646 // - adb reverse --remove-all : responds OKAY
UpdateReverseConfig(std::string_view service_addr)1647 void atransport::UpdateReverseConfig(std::string_view service_addr) {
1648 fdevent_check_looper();
1649 if (!android::base::ConsumePrefix(&service_addr, "reverse:")) {
1650 return;
1651 }
1652
1653 if (android::base::ConsumePrefix(&service_addr, "forward:")) {
1654 // forward:[norebind:]<remote>;<local>
1655 bool norebind = android::base::ConsumePrefix(&service_addr, "norebind:");
1656 auto it = service_addr.find(';');
1657 if (it == std::string::npos) {
1658 return;
1659 }
1660 std::string remote(service_addr.substr(0, it));
1661
1662 if (norebind && reverse_forwards_.find(remote) != reverse_forwards_.end()) {
1663 // This will fail, don't update the map.
1664 LOG(DEBUG) << "ignoring reverse forward that will fail due to norebind";
1665 return;
1666 }
1667
1668 std::string local(service_addr.substr(it + 1));
1669 reverse_forwards_[remote] = local;
1670 } else if (android::base::ConsumePrefix(&service_addr, "killforward:")) {
1671 // kill-forward:<remote>
1672 auto it = service_addr.find(';');
1673 if (it != std::string::npos) {
1674 return;
1675 }
1676 reverse_forwards_.erase(std::string(service_addr));
1677 } else if (service_addr == "killforward-all") {
1678 reverse_forwards_.clear();
1679 } else if (service_addr == "list-forward") {
1680 LOG(DEBUG) << __func__ << " ignoring --list";
1681 } else { // Anything else we need to know about?
1682 LOG(FATAL) << "unhandled reverse service: " << service_addr;
1683 }
1684 }
1685
1686 // Is this an authorized :connect request?
IsReverseConfigured(const std::string & local_addr)1687 bool atransport::IsReverseConfigured(const std::string& local_addr) {
1688 fdevent_check_looper();
1689 for (const auto& [remote, local] : reverse_forwards_) {
1690 if (local == local_addr) {
1691 return true;
1692 }
1693 }
1694 return false;
1695 }
1696
1697 #endif
1698
check_header(apacket * p,atransport * t)1699 bool check_header(apacket* p, atransport* t) {
1700 if (p->msg.magic != (p->msg.command ^ 0xffffffff)) {
1701 VLOG(RWX) << "check_header(): invalid magic command = " << std::hex << p->msg.command
1702 << ", magic = " << p->msg.magic;
1703 return false;
1704 }
1705
1706 if (p->msg.data_length > t->get_max_payload()) {
1707 VLOG(RWX) << "check_header(): " << p->msg.data_length
1708 << " atransport::max_payload = " << t->get_max_payload();
1709 return false;
1710 }
1711
1712 return true;
1713 }
1714
1715 #if ADB_HOST
Key()1716 std::shared_ptr<RSA> atransport::Key() {
1717 if (keys_.empty()) {
1718 return nullptr;
1719 }
1720
1721 std::shared_ptr<RSA> result = keys_[0];
1722 return result;
1723 }
1724
NextKey()1725 std::shared_ptr<RSA> atransport::NextKey() {
1726 if (keys_.empty()) {
1727 LOG(INFO) << "fetching keys for transport " << this->serial_name();
1728 keys_ = adb_auth_get_private_keys();
1729
1730 // We should have gotten at least one key: the one that's automatically generated.
1731 CHECK(!keys_.empty());
1732 } else {
1733 keys_.pop_front();
1734 }
1735
1736 return Key();
1737 }
1738
ResetKeys()1739 void atransport::ResetKeys() {
1740 keys_.clear();
1741 }
1742 #endif
1743