• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2007 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define TRACE_TAG TRANSPORT
18 
19 #include "sysdeps.h"
20 
21 #include "transport.h"
22 
23 #include <ctype.h>
24 #include <errno.h>
25 #include <inttypes.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <unistd.h>
30 
31 #include <algorithm>
32 #include <deque>
33 #include <list>
34 #include <memory>
35 #include <mutex>
36 #include <set>
37 #include <thread>
38 
39 #include <android-base/logging.h>
40 #include <android-base/parsenetaddress.h>
41 #include <android-base/stringprintf.h>
42 #include <android-base/strings.h>
43 #include <android-base/thread_annotations.h>
44 
45 #include <diagnose_usb.h>
46 
47 #include "adb.h"
48 #include "adb_auth.h"
49 #include "adb_io.h"
50 #include "adb_trace.h"
51 #include "adb_utils.h"
52 #include "fdevent.h"
53 #include "sysdeps/chrono.h"
54 
55 using android::base::ScopedLockAssertion;
56 
57 static void remove_transport(atransport* transport);
58 static void transport_unref(atransport* transport);
59 
60 // TODO: unordered_map<TransportId, atransport*>
61 static auto& transport_list = *new std::list<atransport*>();
62 static auto& pending_list = *new std::list<atransport*>();
63 
64 static auto& transport_lock = *new std::recursive_mutex();
65 
66 const char* const kFeatureShell2 = "shell_v2";
67 const char* const kFeatureCmd = "cmd";
68 const char* const kFeatureStat2 = "stat_v2";
69 const char* const kFeatureLibusb = "libusb";
70 const char* const kFeaturePushSync = "push_sync";
71 const char* const kFeatureApex = "apex";
72 const char* const kFeatureFixedPushMkdir = "fixed_push_mkdir";
73 const char* const kFeatureAbb = "abb";
74 const char* const kFeatureFixedPushSymlinkTimestamp = "fixed_push_symlink_timestamp";
75 const char* const kFeatureAbbExec = "abb_exec";
76 
77 namespace {
78 
79 #if ADB_HOST
80 // Tracks and handles atransport*s that are attempting reconnection.
81 class ReconnectHandler {
82   public:
83     ReconnectHandler() = default;
84     ~ReconnectHandler() = default;
85 
86     // Starts the ReconnectHandler thread.
87     void Start();
88 
89     // Requests the ReconnectHandler thread to stop.
90     void Stop();
91 
92     // Adds the atransport* to the queue of reconnect attempts.
93     void TrackTransport(atransport* transport);
94 
95     // Wake up the ReconnectHandler thread to have it check for kicked transports.
96     void CheckForKicked();
97 
98   private:
99     // The main thread loop.
100     void Run();
101 
102     // Tracks a reconnection attempt.
103     struct ReconnectAttempt {
104         atransport* transport;
105         std::chrono::steady_clock::time_point reconnect_time;
106         size_t attempts_left;
107 
operator <__anon505010880111::ReconnectHandler::ReconnectAttempt108         bool operator<(const ReconnectAttempt& rhs) const {
109             if (reconnect_time == rhs.reconnect_time) {
110                 return reinterpret_cast<uintptr_t>(transport) <
111                        reinterpret_cast<uintptr_t>(rhs.transport);
112             }
113             return reconnect_time < rhs.reconnect_time;
114         }
115     };
116 
117     // Only retry for up to one minute.
118     static constexpr const std::chrono::seconds kDefaultTimeout = 10s;
119     static constexpr const size_t kMaxAttempts = 6;
120 
121     // Protects all members.
122     std::mutex reconnect_mutex_;
123     bool running_ GUARDED_BY(reconnect_mutex_) = true;
124     std::thread handler_thread_;
125     std::condition_variable reconnect_cv_;
126     std::set<ReconnectAttempt> reconnect_queue_ GUARDED_BY(reconnect_mutex_);
127 
128     DISALLOW_COPY_AND_ASSIGN(ReconnectHandler);
129 };
130 
Start()131 void ReconnectHandler::Start() {
132     check_main_thread();
133     handler_thread_ = std::thread(&ReconnectHandler::Run, this);
134 }
135 
Stop()136 void ReconnectHandler::Stop() {
137     check_main_thread();
138     {
139         std::lock_guard<std::mutex> lock(reconnect_mutex_);
140         running_ = false;
141     }
142     reconnect_cv_.notify_one();
143     handler_thread_.join();
144 
145     // Drain the queue to free all resources.
146     std::lock_guard<std::mutex> lock(reconnect_mutex_);
147     while (!reconnect_queue_.empty()) {
148         ReconnectAttempt attempt = *reconnect_queue_.begin();
149         reconnect_queue_.erase(reconnect_queue_.begin());
150         remove_transport(attempt.transport);
151     }
152 }
153 
TrackTransport(atransport * transport)154 void ReconnectHandler::TrackTransport(atransport* transport) {
155     check_main_thread();
156     {
157         std::lock_guard<std::mutex> lock(reconnect_mutex_);
158         if (!running_) return;
159         // Arbitrary sleep to give adbd time to get ready, if we disconnected because it exited.
160         auto reconnect_time = std::chrono::steady_clock::now() + 250ms;
161         reconnect_queue_.emplace(
162                 ReconnectAttempt{transport, reconnect_time, ReconnectHandler::kMaxAttempts});
163     }
164     reconnect_cv_.notify_one();
165 }
166 
CheckForKicked()167 void ReconnectHandler::CheckForKicked() {
168     reconnect_cv_.notify_one();
169 }
170 
Run()171 void ReconnectHandler::Run() {
172     while (true) {
173         ReconnectAttempt attempt;
174         {
175             std::unique_lock<std::mutex> lock(reconnect_mutex_);
176             ScopedLockAssertion assume_lock(reconnect_mutex_);
177 
178             if (!reconnect_queue_.empty()) {
179                 // FIXME: libstdc++ (used on Windows) implements condition_variable with
180                 //        system_clock as its clock, so we're probably hosed if the clock changes,
181                 //        even if we use steady_clock throughout. This problem goes away once we
182                 //        switch to libc++.
183                 reconnect_cv_.wait_until(lock, reconnect_queue_.begin()->reconnect_time);
184             } else {
185                 reconnect_cv_.wait(lock);
186             }
187 
188             if (!running_) return;
189 
190             // Scan the whole list for kicked transports, so that we immediately handle an explicit
191             // disconnect request.
192             bool kicked = false;
193             for (auto it = reconnect_queue_.begin(); it != reconnect_queue_.end();) {
194                 if (it->transport->kicked()) {
195                     D("transport %s was kicked. giving up on it.", it->transport->serial.c_str());
196                     remove_transport(it->transport);
197                     it = reconnect_queue_.erase(it);
198                 } else {
199                     ++it;
200                 }
201                 kicked = true;
202             }
203 
204             if (reconnect_queue_.empty()) continue;
205 
206             // Go back to sleep if we either woke up spuriously, or we were woken up to remove
207             // a kicked transport, and the first transport isn't ready for reconnection yet.
208             auto now = std::chrono::steady_clock::now();
209             if (reconnect_queue_.begin()->reconnect_time > now) {
210                 continue;
211             }
212 
213             attempt = *reconnect_queue_.begin();
214             reconnect_queue_.erase(reconnect_queue_.begin());
215         }
216         D("attempting to reconnect %s", attempt.transport->serial.c_str());
217 
218         switch (attempt.transport->Reconnect()) {
219             case ReconnectResult::Retry: {
220                 D("attempting to reconnect %s failed.", attempt.transport->serial.c_str());
221                 if (attempt.attempts_left == 0) {
222                     D("transport %s exceeded the number of retry attempts. giving up on it.",
223                       attempt.transport->serial.c_str());
224                     remove_transport(attempt.transport);
225                     continue;
226                 }
227 
228                 std::lock_guard<std::mutex> lock(reconnect_mutex_);
229                 reconnect_queue_.emplace(ReconnectAttempt{
230                         attempt.transport,
231                         std::chrono::steady_clock::now() + ReconnectHandler::kDefaultTimeout,
232                         attempt.attempts_left - 1});
233                 continue;
234             }
235 
236             case ReconnectResult::Success:
237                 D("reconnection to %s succeeded.", attempt.transport->serial.c_str());
238                 register_transport(attempt.transport);
239                 continue;
240 
241             case ReconnectResult::Abort:
242                 D("cancelling reconnection attempt to %s.", attempt.transport->serial.c_str());
243                 remove_transport(attempt.transport);
244                 continue;
245         }
246     }
247 }
248 
249 static auto& reconnect_handler = *new ReconnectHandler();
250 
251 #endif
252 
253 }  // namespace
254 
NextTransportId()255 TransportId NextTransportId() {
256     static std::atomic<TransportId> next(1);
257     return next++;
258 }
259 
Reset()260 void Connection::Reset() {
261     LOG(INFO) << "Connection::Reset(): stopping";
262     Stop();
263 }
264 
BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)265 BlockingConnectionAdapter::BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)
266     : underlying_(std::move(connection)) {}
267 
~BlockingConnectionAdapter()268 BlockingConnectionAdapter::~BlockingConnectionAdapter() {
269     LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): destructing";
270     Stop();
271 }
272 
Start()273 void BlockingConnectionAdapter::Start() {
274     std::lock_guard<std::mutex> lock(mutex_);
275     if (started_) {
276         LOG(FATAL) << "BlockingConnectionAdapter(" << this->transport_name_
277                    << "): started multiple times";
278     }
279 
280     read_thread_ = std::thread([this]() {
281         LOG(INFO) << this->transport_name_ << ": read thread spawning";
282         while (true) {
283             auto packet = std::make_unique<apacket>();
284             if (!underlying_->Read(packet.get())) {
285                 PLOG(INFO) << this->transport_name_ << ": read failed";
286                 break;
287             }
288             read_callback_(this, std::move(packet));
289         }
290         std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "read failed"); });
291     });
292 
293     write_thread_ = std::thread([this]() {
294         LOG(INFO) << this->transport_name_ << ": write thread spawning";
295         while (true) {
296             std::unique_lock<std::mutex> lock(mutex_);
297             ScopedLockAssertion assume_locked(mutex_);
298             cv_.wait(lock, [this]() REQUIRES(mutex_) {
299                 return this->stopped_ || !this->write_queue_.empty();
300             });
301 
302             if (this->stopped_) {
303                 return;
304             }
305 
306             std::unique_ptr<apacket> packet = std::move(this->write_queue_.front());
307             this->write_queue_.pop_front();
308             lock.unlock();
309 
310             if (!this->underlying_->Write(packet.get())) {
311                 break;
312             }
313         }
314         std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "write failed"); });
315     });
316 
317     started_ = true;
318 }
319 
Reset()320 void BlockingConnectionAdapter::Reset() {
321     {
322         std::lock_guard<std::mutex> lock(mutex_);
323         if (!started_) {
324             LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): not started";
325             return;
326         }
327 
328         if (stopped_) {
329             LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_
330                       << "): already stopped";
331             return;
332         }
333     }
334 
335     LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): resetting";
336     this->underlying_->Reset();
337     Stop();
338 }
339 
Stop()340 void BlockingConnectionAdapter::Stop() {
341     {
342         std::lock_guard<std::mutex> lock(mutex_);
343         if (!started_) {
344             LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): not started";
345             return;
346         }
347 
348         if (stopped_) {
349             LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_
350                       << "): already stopped";
351             return;
352         }
353 
354         stopped_ = true;
355     }
356 
357     LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopping";
358 
359     this->underlying_->Close();
360     this->cv_.notify_one();
361 
362     // Move the threads out into locals with the lock taken, and then unlock to let them exit.
363     std::thread read_thread;
364     std::thread write_thread;
365 
366     {
367         std::lock_guard<std::mutex> lock(mutex_);
368         read_thread = std::move(read_thread_);
369         write_thread = std::move(write_thread_);
370     }
371 
372     read_thread.join();
373     write_thread.join();
374 
375     LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopped";
376     std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "requested stop"); });
377 }
378 
Write(std::unique_ptr<apacket> packet)379 bool BlockingConnectionAdapter::Write(std::unique_ptr<apacket> packet) {
380     {
381         std::lock_guard<std::mutex> lock(this->mutex_);
382         write_queue_.emplace_back(std::move(packet));
383     }
384 
385     cv_.notify_one();
386     return true;
387 }
388 
Read(apacket * packet)389 bool FdConnection::Read(apacket* packet) {
390     if (!ReadFdExactly(fd_.get(), &packet->msg, sizeof(amessage))) {
391         D("remote local: read terminated (message)");
392         return false;
393     }
394 
395     if (packet->msg.data_length > MAX_PAYLOAD) {
396         D("remote local: read overflow (data length = %" PRIu32 ")", packet->msg.data_length);
397         return false;
398     }
399 
400     packet->payload.resize(packet->msg.data_length);
401 
402     if (!ReadFdExactly(fd_.get(), &packet->payload[0], packet->payload.size())) {
403         D("remote local: terminated (data)");
404         return false;
405     }
406 
407     return true;
408 }
409 
Write(apacket * packet)410 bool FdConnection::Write(apacket* packet) {
411     if (!WriteFdExactly(fd_.get(), &packet->msg, sizeof(packet->msg))) {
412         D("remote local: write terminated");
413         return false;
414     }
415 
416     if (packet->msg.data_length) {
417         if (!WriteFdExactly(fd_.get(), &packet->payload[0], packet->msg.data_length)) {
418             D("remote local: write terminated");
419             return false;
420         }
421     }
422 
423     return true;
424 }
425 
Close()426 void FdConnection::Close() {
427     adb_shutdown(fd_.get());
428     fd_.reset();
429 }
430 
send_packet(apacket * p,atransport * t)431 void send_packet(apacket* p, atransport* t) {
432     p->msg.magic = p->msg.command ^ 0xffffffff;
433     // compute a checksum for connection/auth packets for compatibility reasons
434     if (t->get_protocol_version() >= A_VERSION_SKIP_CHECKSUM) {
435         p->msg.data_check = 0;
436     } else {
437         p->msg.data_check = calculate_apacket_checksum(p);
438     }
439 
440     VLOG(TRANSPORT) << dump_packet(t->serial.c_str(), "to remote", p);
441 
442     if (t == nullptr) {
443         LOG(FATAL) << "Transport is null";
444     }
445 
446     if (t->Write(p) != 0) {
447         D("%s: failed to enqueue packet, closing transport", t->serial.c_str());
448         t->Kick();
449     }
450 }
451 
kick_transport(atransport * t,bool reset)452 void kick_transport(atransport* t, bool reset) {
453     std::lock_guard<std::recursive_mutex> lock(transport_lock);
454     // As kick_transport() can be called from threads without guarantee that t is valid,
455     // check if the transport is in transport_list first.
456     //
457     // TODO(jmgao): WTF? Is this actually true?
458     if (std::find(transport_list.begin(), transport_list.end(), t) != transport_list.end()) {
459         if (reset) {
460             t->Reset();
461         } else {
462             t->Kick();
463         }
464     }
465 
466 #if ADB_HOST
467     reconnect_handler.CheckForKicked();
468 #endif
469 }
470 
471 static int transport_registration_send = -1;
472 static int transport_registration_recv = -1;
473 static fdevent* transport_registration_fde;
474 
475 #if ADB_HOST
476 
477 /* this adds support required by the 'track-devices' service.
478  * this is used to send the content of "list_transport" to any
479  * number of client connections that want it through a single
480  * live TCP connection
481  */
482 struct device_tracker {
483     asocket socket;
484     bool update_needed = false;
485     bool long_output = false;
486     device_tracker* next = nullptr;
487 };
488 
489 /* linked list of all device trackers */
490 static device_tracker* device_tracker_list;
491 
device_tracker_remove(device_tracker * tracker)492 static void device_tracker_remove(device_tracker* tracker) {
493     device_tracker** pnode = &device_tracker_list;
494     device_tracker* node = *pnode;
495 
496     std::lock_guard<std::recursive_mutex> lock(transport_lock);
497     while (node) {
498         if (node == tracker) {
499             *pnode = node->next;
500             break;
501         }
502         pnode = &node->next;
503         node = *pnode;
504     }
505 }
506 
device_tracker_close(asocket * socket)507 static void device_tracker_close(asocket* socket) {
508     device_tracker* tracker = (device_tracker*)socket;
509     asocket* peer = socket->peer;
510 
511     D("device tracker %p removed", tracker);
512     if (peer) {
513         peer->peer = nullptr;
514         peer->close(peer);
515     }
516     device_tracker_remove(tracker);
517     delete tracker;
518 }
519 
device_tracker_enqueue(asocket * socket,apacket::payload_type)520 static int device_tracker_enqueue(asocket* socket, apacket::payload_type) {
521     /* you can't read from a device tracker, close immediately */
522     device_tracker_close(socket);
523     return -1;
524 }
525 
device_tracker_send(device_tracker * tracker,const std::string & string)526 static int device_tracker_send(device_tracker* tracker, const std::string& string) {
527     asocket* peer = tracker->socket.peer;
528 
529     apacket::payload_type data;
530     data.resize(4 + string.size());
531     char buf[5];
532     snprintf(buf, sizeof(buf), "%04x", static_cast<int>(string.size()));
533     memcpy(&data[0], buf, 4);
534     memcpy(&data[4], string.data(), string.size());
535     return peer->enqueue(peer, std::move(data));
536 }
537 
device_tracker_ready(asocket * socket)538 static void device_tracker_ready(asocket* socket) {
539     device_tracker* tracker = reinterpret_cast<device_tracker*>(socket);
540 
541     // We want to send the device list when the tracker connects
542     // for the first time, even if no update occurred.
543     if (tracker->update_needed) {
544         tracker->update_needed = false;
545 
546         std::string transports = list_transports(tracker->long_output);
547         device_tracker_send(tracker, transports);
548     }
549 }
550 
create_device_tracker(bool long_output)551 asocket* create_device_tracker(bool long_output) {
552     device_tracker* tracker = new device_tracker();
553     if (tracker == nullptr) LOG(FATAL) << "cannot allocate device tracker";
554 
555     D("device tracker %p created", tracker);
556 
557     tracker->socket.enqueue = device_tracker_enqueue;
558     tracker->socket.ready = device_tracker_ready;
559     tracker->socket.close = device_tracker_close;
560     tracker->update_needed = true;
561     tracker->long_output = long_output;
562 
563     tracker->next = device_tracker_list;
564     device_tracker_list = tracker;
565 
566     return &tracker->socket;
567 }
568 
569 // Check if all of the USB transports are connected.
iterate_transports(std::function<bool (const atransport *)> fn)570 bool iterate_transports(std::function<bool(const atransport*)> fn) {
571     std::lock_guard<std::recursive_mutex> lock(transport_lock);
572     for (const auto& t : transport_list) {
573         if (!fn(t)) {
574             return false;
575         }
576     }
577     for (const auto& t : pending_list) {
578         if (!fn(t)) {
579             return false;
580         }
581     }
582     return true;
583 }
584 
585 // Call this function each time the transport list has changed.
update_transports()586 void update_transports() {
587     update_transport_status();
588 
589     // Notify `adb track-devices` clients.
590     std::string transports = list_transports(false);
591 
592     device_tracker* tracker = device_tracker_list;
593     while (tracker != nullptr) {
594         device_tracker* next = tracker->next;
595         // This may destroy the tracker if the connection is closed.
596         device_tracker_send(tracker, transports);
597         tracker = next;
598     }
599 }
600 
601 #else
602 
update_transports()603 void update_transports() {
604     // Nothing to do on the device side.
605 }
606 
607 #endif  // ADB_HOST
608 
609 struct tmsg {
610     atransport* transport;
611     int action;
612 };
613 
transport_read_action(int fd,struct tmsg * m)614 static int transport_read_action(int fd, struct tmsg* m) {
615     char* p = (char*)m;
616     int len = sizeof(*m);
617     int r;
618 
619     while (len > 0) {
620         r = adb_read(fd, p, len);
621         if (r > 0) {
622             len -= r;
623             p += r;
624         } else {
625             D("transport_read_action: on fd %d: %s", fd, strerror(errno));
626             return -1;
627         }
628     }
629     return 0;
630 }
631 
transport_write_action(int fd,struct tmsg * m)632 static int transport_write_action(int fd, struct tmsg* m) {
633     char* p = (char*)m;
634     int len = sizeof(*m);
635     int r;
636 
637     while (len > 0) {
638         r = adb_write(fd, p, len);
639         if (r > 0) {
640             len -= r;
641             p += r;
642         } else {
643             D("transport_write_action: on fd %d: %s", fd, strerror(errno));
644             return -1;
645         }
646     }
647     return 0;
648 }
649 
transport_registration_func(int _fd,unsigned ev,void *)650 static void transport_registration_func(int _fd, unsigned ev, void*) {
651     tmsg m;
652     atransport* t;
653 
654     if (!(ev & FDE_READ)) {
655         return;
656     }
657 
658     if (transport_read_action(_fd, &m)) {
659         PLOG(FATAL) << "cannot read transport registration socket";
660     }
661 
662     t = m.transport;
663 
664     if (m.action == 0) {
665         D("transport: %s deleting", t->serial.c_str());
666 
667         {
668             std::lock_guard<std::recursive_mutex> lock(transport_lock);
669             transport_list.remove(t);
670         }
671 
672         delete t;
673 
674         update_transports();
675         return;
676     }
677 
678     /* don't create transport threads for inaccessible devices */
679     if (t->GetConnectionState() != kCsNoPerm) {
680         // The connection gets a reference to the atransport. It will release it
681         // upon a read/write error.
682         t->ref_count++;
683         t->connection()->SetTransportName(t->serial_name());
684         t->connection()->SetReadCallback([t](Connection*, std::unique_ptr<apacket> p) {
685             if (!check_header(p.get(), t)) {
686                 D("%s: remote read: bad header", t->serial.c_str());
687                 return false;
688             }
689 
690             VLOG(TRANSPORT) << dump_packet(t->serial.c_str(), "from remote", p.get());
691             apacket* packet = p.release();
692 
693             // TODO: Does this need to run on the main thread?
694             fdevent_run_on_main_thread([packet, t]() { handle_packet(packet, t); });
695             return true;
696         });
697         t->connection()->SetErrorCallback([t](Connection*, const std::string& error) {
698             LOG(INFO) << t->serial_name() << ": connection terminated: " << error;
699             fdevent_run_on_main_thread([t]() {
700                 handle_offline(t);
701                 transport_unref(t);
702             });
703         });
704 
705         t->connection()->Start();
706 #if ADB_HOST
707         send_connect(t);
708 #endif
709     }
710 
711     {
712         std::lock_guard<std::recursive_mutex> lock(transport_lock);
713         auto it = std::find(pending_list.begin(), pending_list.end(), t);
714         if (it != pending_list.end()) {
715             pending_list.remove(t);
716             transport_list.push_front(t);
717         }
718     }
719 
720     update_transports();
721 }
722 
723 #if ADB_HOST
init_reconnect_handler(void)724 void init_reconnect_handler(void) {
725     reconnect_handler.Start();
726 }
727 #endif
728 
init_transport_registration(void)729 void init_transport_registration(void) {
730     int s[2];
731 
732     if (adb_socketpair(s)) {
733         PLOG(FATAL) << "cannot open transport registration socketpair";
734     }
735     D("socketpair: (%d,%d)", s[0], s[1]);
736 
737     transport_registration_send = s[0];
738     transport_registration_recv = s[1];
739 
740     transport_registration_fde =
741         fdevent_create(transport_registration_recv, transport_registration_func, nullptr);
742     fdevent_set(transport_registration_fde, FDE_READ);
743 }
744 
kick_all_transports()745 void kick_all_transports() {
746 #if ADB_HOST
747     reconnect_handler.Stop();
748 #endif
749     // To avoid only writing part of a packet to a transport after exit, kick all transports.
750     std::lock_guard<std::recursive_mutex> lock(transport_lock);
751     for (auto t : transport_list) {
752         t->Kick();
753     }
754 }
755 
756 /* the fdevent select pump is single threaded */
register_transport(atransport * transport)757 void register_transport(atransport* transport) {
758     tmsg m;
759     m.transport = transport;
760     m.action = 1;
761     D("transport: %s registered", transport->serial.c_str());
762     if (transport_write_action(transport_registration_send, &m)) {
763         PLOG(FATAL) << "cannot write transport registration socket";
764     }
765 }
766 
remove_transport(atransport * transport)767 static void remove_transport(atransport* transport) {
768     tmsg m;
769     m.transport = transport;
770     m.action = 0;
771     D("transport: %s removed", transport->serial.c_str());
772     if (transport_write_action(transport_registration_send, &m)) {
773         PLOG(FATAL) << "cannot write transport registration socket";
774     }
775 }
776 
transport_unref(atransport * t)777 static void transport_unref(atransport* t) {
778     check_main_thread();
779     CHECK(t != nullptr);
780 
781     std::lock_guard<std::recursive_mutex> lock(transport_lock);
782     CHECK_GT(t->ref_count, 0u);
783     t->ref_count--;
784     if (t->ref_count == 0) {
785         LOG(INFO) << "destroying transport " << t->serial_name();
786         t->connection()->Stop();
787 #if ADB_HOST
788         if (t->IsTcpDevice() && !t->kicked()) {
789             D("transport: %s unref (attempting reconnection)", t->serial.c_str());
790 
791             // We need to clear the transport's keys, so that on the next connection, it tries
792             // again from the beginning.
793             t->ResetKeys();
794             reconnect_handler.TrackTransport(t);
795         } else {
796             D("transport: %s unref (kicking and closing)", t->serial.c_str());
797             remove_transport(t);
798         }
799 #else
800         D("transport: %s unref (kicking and closing)", t->serial.c_str());
801         remove_transport(t);
802 #endif
803 
804     } else {
805         D("transport: %s unref (count=%zu)", t->serial.c_str(), t->ref_count);
806     }
807 }
808 
qual_match(const std::string & to_test,const char * prefix,const std::string & qual,bool sanitize_qual)809 static int qual_match(const std::string& to_test, const char* prefix, const std::string& qual,
810                       bool sanitize_qual) {
811     if (to_test.empty()) /* Return true if both the qual and to_test are empty strings. */
812         return qual.empty();
813 
814     if (qual.empty()) return 0;
815 
816     const char* ptr = to_test.c_str();
817     if (prefix) {
818         while (*prefix) {
819             if (*prefix++ != *ptr++) return 0;
820         }
821     }
822 
823     for (char ch : qual) {
824         if (sanitize_qual && !isalnum(ch)) ch = '_';
825         if (ch != *ptr++) return 0;
826     }
827 
828     /* Everything matched so far.  Return true if *ptr is a NUL. */
829     return !*ptr;
830 }
831 
acquire_one_transport(TransportType type,const char * serial,TransportId transport_id,bool * is_ambiguous,std::string * error_out,bool accept_any_state)832 atransport* acquire_one_transport(TransportType type, const char* serial, TransportId transport_id,
833                                   bool* is_ambiguous, std::string* error_out,
834                                   bool accept_any_state) {
835     atransport* result = nullptr;
836 
837     if (transport_id != 0) {
838         *error_out =
839             android::base::StringPrintf("no device with transport id '%" PRIu64 "'", transport_id);
840     } else if (serial) {
841         *error_out = android::base::StringPrintf("device '%s' not found", serial);
842     } else if (type == kTransportLocal) {
843         *error_out = "no emulators found";
844     } else if (type == kTransportAny) {
845         *error_out = "no devices/emulators found";
846     } else {
847         *error_out = "no devices found";
848     }
849 
850     std::unique_lock<std::recursive_mutex> lock(transport_lock);
851     for (const auto& t : transport_list) {
852         if (t->GetConnectionState() == kCsNoPerm) {
853             *error_out = UsbNoPermissionsLongHelpText();
854             continue;
855         }
856 
857         if (transport_id) {
858             if (t->id == transport_id) {
859                 result = t;
860                 break;
861             }
862         } else if (serial) {
863             if (t->MatchesTarget(serial)) {
864                 if (result) {
865                     *error_out = "more than one device";
866                     if (is_ambiguous) *is_ambiguous = true;
867                     result = nullptr;
868                     break;
869                 }
870                 result = t;
871             }
872         } else {
873             if (type == kTransportUsb && t->type == kTransportUsb) {
874                 if (result) {
875                     *error_out = "more than one device";
876                     if (is_ambiguous) *is_ambiguous = true;
877                     result = nullptr;
878                     break;
879                 }
880                 result = t;
881             } else if (type == kTransportLocal && t->type == kTransportLocal) {
882                 if (result) {
883                     *error_out = "more than one emulator";
884                     if (is_ambiguous) *is_ambiguous = true;
885                     result = nullptr;
886                     break;
887                 }
888                 result = t;
889             } else if (type == kTransportAny) {
890                 if (result) {
891                     *error_out = "more than one device/emulator";
892                     if (is_ambiguous) *is_ambiguous = true;
893                     result = nullptr;
894                     break;
895                 }
896                 result = t;
897             }
898         }
899     }
900     lock.unlock();
901 
902     if (result && !accept_any_state) {
903         // The caller requires an active transport.
904         // Make sure that we're actually connected.
905         ConnectionState state = result->GetConnectionState();
906         switch (state) {
907             case kCsConnecting:
908                 *error_out = "device still connecting";
909                 result = nullptr;
910                 break;
911 
912             case kCsAuthorizing:
913                 *error_out = "device still authorizing";
914                 result = nullptr;
915                 break;
916 
917             case kCsUnauthorized: {
918                 *error_out = "device unauthorized.\n";
919                 char* ADB_VENDOR_KEYS = getenv("ADB_VENDOR_KEYS");
920                 *error_out += "This adb server's $ADB_VENDOR_KEYS is ";
921                 *error_out += ADB_VENDOR_KEYS ? ADB_VENDOR_KEYS : "not set";
922                 *error_out += "\n";
923                 *error_out += "Try 'adb kill-server' if that seems wrong.\n";
924                 *error_out += "Otherwise check for a confirmation dialog on your device.";
925                 result = nullptr;
926                 break;
927             }
928 
929             case kCsOffline:
930                 *error_out = "device offline";
931                 result = nullptr;
932                 break;
933 
934             default:
935                 break;
936         }
937     }
938 
939     if (result) {
940         *error_out = "success";
941     }
942 
943     return result;
944 }
945 
WaitForConnection(std::chrono::milliseconds timeout)946 bool ConnectionWaitable::WaitForConnection(std::chrono::milliseconds timeout) {
947     std::unique_lock<std::mutex> lock(mutex_);
948     ScopedLockAssertion assume_locked(mutex_);
949     return cv_.wait_for(lock, timeout, [&]() REQUIRES(mutex_) {
950         return connection_established_ready_;
951     }) && connection_established_;
952 }
953 
SetConnectionEstablished(bool success)954 void ConnectionWaitable::SetConnectionEstablished(bool success) {
955     {
956         std::lock_guard<std::mutex> lock(mutex_);
957         if (connection_established_ready_) return;
958         connection_established_ready_ = true;
959         connection_established_ = success;
960         D("connection established with %d", success);
961     }
962     cv_.notify_one();
963 }
964 
~atransport()965 atransport::~atransport() {
966     // If the connection callback had not been run before, run it now.
967     SetConnectionEstablished(false);
968 }
969 
Write(apacket * p)970 int atransport::Write(apacket* p) {
971     return this->connection()->Write(std::unique_ptr<apacket>(p)) ? 0 : -1;
972 }
973 
Reset()974 void atransport::Reset() {
975     if (!kicked_.exchange(true)) {
976         LOG(INFO) << "resetting transport " << this << " " << this->serial;
977         this->connection()->Reset();
978     }
979 }
980 
Kick()981 void atransport::Kick() {
982     if (!kicked_.exchange(true)) {
983         LOG(INFO) << "kicking transport " << this << " " << this->serial;
984         this->connection()->Stop();
985     }
986 }
987 
GetConnectionState() const988 ConnectionState atransport::GetConnectionState() const {
989     return connection_state_;
990 }
991 
SetConnectionState(ConnectionState state)992 void atransport::SetConnectionState(ConnectionState state) {
993     check_main_thread();
994     connection_state_ = state;
995 }
996 
SetConnection(std::unique_ptr<Connection> connection)997 void atransport::SetConnection(std::unique_ptr<Connection> connection) {
998     std::lock_guard<std::mutex> lock(mutex_);
999     connection_ = std::shared_ptr<Connection>(std::move(connection));
1000 }
1001 
connection_state_name() const1002 std::string atransport::connection_state_name() const {
1003     ConnectionState state = GetConnectionState();
1004     switch (state) {
1005         case kCsOffline:
1006             return "offline";
1007         case kCsBootloader:
1008             return "bootloader";
1009         case kCsDevice:
1010             return "device";
1011         case kCsHost:
1012             return "host";
1013         case kCsRecovery:
1014             return "recovery";
1015         case kCsRescue:
1016             return "rescue";
1017         case kCsNoPerm:
1018             return UsbNoPermissionsShortHelpText();
1019         case kCsSideload:
1020             return "sideload";
1021         case kCsUnauthorized:
1022             return "unauthorized";
1023         case kCsAuthorizing:
1024             return "authorizing";
1025         case kCsConnecting:
1026             return "connecting";
1027         default:
1028             return "unknown";
1029     }
1030 }
1031 
update_version(int version,size_t payload)1032 void atransport::update_version(int version, size_t payload) {
1033     protocol_version = std::min(version, A_VERSION);
1034     max_payload = std::min(payload, MAX_PAYLOAD);
1035 }
1036 
get_protocol_version() const1037 int atransport::get_protocol_version() const {
1038     return protocol_version;
1039 }
1040 
get_max_payload() const1041 size_t atransport::get_max_payload() const {
1042     return max_payload;
1043 }
1044 
supported_features()1045 const FeatureSet& supported_features() {
1046     // Local static allocation to avoid global non-POD variables.
1047     static const FeatureSet* features = new FeatureSet{
1048             kFeatureShell2,
1049             kFeatureCmd,
1050             kFeatureStat2,
1051             kFeatureFixedPushMkdir,
1052             kFeatureApex,
1053             kFeatureAbb,
1054             kFeatureFixedPushSymlinkTimestamp,
1055             kFeatureAbbExec,
1056             // Increment ADB_SERVER_VERSION when adding a feature that adbd needs
1057             // to know about. Otherwise, the client can be stuck running an old
1058             // version of the server even after upgrading their copy of adb.
1059             // (http://b/24370690)
1060     };
1061 
1062     return *features;
1063 }
1064 
FeatureSetToString(const FeatureSet & features)1065 std::string FeatureSetToString(const FeatureSet& features) {
1066     return android::base::Join(features, ',');
1067 }
1068 
StringToFeatureSet(const std::string & features_string)1069 FeatureSet StringToFeatureSet(const std::string& features_string) {
1070     if (features_string.empty()) {
1071         return FeatureSet();
1072     }
1073 
1074     auto names = android::base::Split(features_string, ",");
1075     return FeatureSet(names.begin(), names.end());
1076 }
1077 
CanUseFeature(const FeatureSet & feature_set,const std::string & feature)1078 bool CanUseFeature(const FeatureSet& feature_set, const std::string& feature) {
1079     return feature_set.count(feature) > 0 && supported_features().count(feature) > 0;
1080 }
1081 
has_feature(const std::string & feature) const1082 bool atransport::has_feature(const std::string& feature) const {
1083     return features_.count(feature) > 0;
1084 }
1085 
SetFeatures(const std::string & features_string)1086 void atransport::SetFeatures(const std::string& features_string) {
1087     features_ = StringToFeatureSet(features_string);
1088 }
1089 
AddDisconnect(adisconnect * disconnect)1090 void atransport::AddDisconnect(adisconnect* disconnect) {
1091     disconnects_.push_back(disconnect);
1092 }
1093 
RemoveDisconnect(adisconnect * disconnect)1094 void atransport::RemoveDisconnect(adisconnect* disconnect) {
1095     disconnects_.remove(disconnect);
1096 }
1097 
RunDisconnects()1098 void atransport::RunDisconnects() {
1099     for (const auto& disconnect : disconnects_) {
1100         disconnect->func(disconnect->opaque, this);
1101     }
1102     disconnects_.clear();
1103 }
1104 
MatchesTarget(const std::string & target) const1105 bool atransport::MatchesTarget(const std::string& target) const {
1106     if (!serial.empty()) {
1107         if (target == serial) {
1108             return true;
1109         } else if (type == kTransportLocal) {
1110             // Local transports can match [tcp:|udp:]<hostname>[:port].
1111             const char* local_target_ptr = target.c_str();
1112 
1113             // For fastboot compatibility, ignore protocol prefixes.
1114             if (android::base::StartsWith(target, "tcp:") ||
1115                 android::base::StartsWith(target, "udp:")) {
1116                 local_target_ptr += 4;
1117             }
1118 
1119             // Parse our |serial| and the given |target| to check if the hostnames and ports match.
1120             std::string serial_host, error;
1121             int serial_port = -1;
1122             if (android::base::ParseNetAddress(serial, &serial_host, &serial_port, nullptr, &error)) {
1123                 // |target| may omit the port to default to ours.
1124                 std::string target_host;
1125                 int target_port = serial_port;
1126                 if (android::base::ParseNetAddress(local_target_ptr, &target_host, &target_port,
1127                                                    nullptr, &error) &&
1128                     serial_host == target_host && serial_port == target_port) {
1129                     return true;
1130                 }
1131             }
1132         }
1133     }
1134 
1135     return (target == devpath) || qual_match(target, "product:", product, false) ||
1136            qual_match(target, "model:", model, true) ||
1137            qual_match(target, "device:", device, false);
1138 }
1139 
SetConnectionEstablished(bool success)1140 void atransport::SetConnectionEstablished(bool success) {
1141     connection_waitable_->SetConnectionEstablished(success);
1142 }
1143 
Reconnect()1144 ReconnectResult atransport::Reconnect() {
1145     return reconnect_(this);
1146 }
1147 
1148 #if ADB_HOST
1149 
1150 // We use newline as our delimiter, make sure to never output it.
sanitize(std::string str,bool alphanumeric)1151 static std::string sanitize(std::string str, bool alphanumeric) {
1152     auto pred = alphanumeric ? [](const char c) { return !isalnum(c); }
1153                              : [](const char c) { return c == '\n'; };
1154     std::replace_if(str.begin(), str.end(), pred, '_');
1155     return str;
1156 }
1157 
append_transport_info(std::string * result,const char * key,const std::string & value,bool alphanumeric)1158 static void append_transport_info(std::string* result, const char* key, const std::string& value,
1159                                   bool alphanumeric) {
1160     if (value.empty()) {
1161         return;
1162     }
1163 
1164     *result += ' ';
1165     *result += key;
1166     *result += sanitize(value, alphanumeric);
1167 }
1168 
append_transport(const atransport * t,std::string * result,bool long_listing)1169 static void append_transport(const atransport* t, std::string* result, bool long_listing) {
1170     std::string serial = t->serial;
1171     if (serial.empty()) {
1172         serial = "(no serial number)";
1173     }
1174 
1175     if (!long_listing) {
1176         *result += serial;
1177         *result += '\t';
1178         *result += t->connection_state_name();
1179     } else {
1180         android::base::StringAppendF(result, "%-22s %s", serial.c_str(),
1181                                      t->connection_state_name().c_str());
1182 
1183         append_transport_info(result, "", t->devpath, false);
1184         append_transport_info(result, "product:", t->product, false);
1185         append_transport_info(result, "model:", t->model, true);
1186         append_transport_info(result, "device:", t->device, false);
1187 
1188         // Put id at the end, so that anyone parsing the output here can always find it by scanning
1189         // backwards from newlines, even with hypothetical devices named 'transport_id:1'.
1190         *result += " transport_id:";
1191         *result += std::to_string(t->id);
1192     }
1193     *result += '\n';
1194 }
1195 
list_transports(bool long_listing)1196 std::string list_transports(bool long_listing) {
1197     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1198 
1199     auto sorted_transport_list = transport_list;
1200     sorted_transport_list.sort([](atransport*& x, atransport*& y) {
1201         if (x->type != y->type) {
1202             return x->type < y->type;
1203         }
1204         return x->serial < y->serial;
1205     });
1206 
1207     std::string result;
1208     for (const auto& t : sorted_transport_list) {
1209         append_transport(t, &result, long_listing);
1210     }
1211     return result;
1212 }
1213 
close_usb_devices(std::function<bool (const atransport *)> predicate,bool reset)1214 void close_usb_devices(std::function<bool(const atransport*)> predicate, bool reset) {
1215     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1216     for (auto& t : transport_list) {
1217         if (predicate(t)) {
1218             if (reset) {
1219                 t->Reset();
1220             } else {
1221                 t->Kick();
1222             }
1223         }
1224     }
1225 }
1226 
1227 /* hack for osx */
close_usb_devices(bool reset)1228 void close_usb_devices(bool reset) {
1229     close_usb_devices([](const atransport*) { return true; }, reset);
1230 }
1231 #endif  // ADB_HOST
1232 
register_socket_transport(unique_fd s,std::string serial,int port,int local,atransport::ReconnectCallback reconnect,int * error)1233 bool register_socket_transport(unique_fd s, std::string serial, int port, int local,
1234                                atransport::ReconnectCallback reconnect, int* error) {
1235     atransport* t = new atransport(std::move(reconnect), kCsOffline);
1236 
1237     D("transport: %s init'ing for socket %d, on port %d", serial.c_str(), s.get(), port);
1238     if (init_socket_transport(t, std::move(s), port, local) < 0) {
1239         delete t;
1240         if (error) *error = errno;
1241         return false;
1242     }
1243 
1244     std::unique_lock<std::recursive_mutex> lock(transport_lock);
1245     for (const auto& transport : pending_list) {
1246         if (serial == transport->serial) {
1247             VLOG(TRANSPORT) << "socket transport " << transport->serial
1248                             << " is already in pending_list and fails to register";
1249             delete t;
1250             if (error) *error = EALREADY;
1251             return false;
1252         }
1253     }
1254 
1255     for (const auto& transport : transport_list) {
1256         if (serial == transport->serial) {
1257             VLOG(TRANSPORT) << "socket transport " << transport->serial
1258                             << " is already in transport_list and fails to register";
1259             delete t;
1260             if (error) *error = EALREADY;
1261             return false;
1262         }
1263     }
1264 
1265     t->serial = std::move(serial);
1266     pending_list.push_front(t);
1267 
1268     lock.unlock();
1269 
1270     auto waitable = t->connection_waitable();
1271     register_transport(t);
1272 
1273     if (local == 1) {
1274         // Do not wait for emulator transports.
1275         return true;
1276     }
1277 
1278     if (!waitable->WaitForConnection(std::chrono::seconds(10))) {
1279         if (error) *error = ETIMEDOUT;
1280         return false;
1281     }
1282 
1283     if (t->GetConnectionState() == kCsUnauthorized) {
1284         if (error) *error = EPERM;
1285         return false;
1286     }
1287 
1288     return true;
1289 }
1290 
1291 #if ADB_HOST
find_transport(const char * serial)1292 atransport* find_transport(const char* serial) {
1293     atransport* result = nullptr;
1294 
1295     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1296     for (auto& t : transport_list) {
1297         if (strcmp(serial, t->serial.c_str()) == 0) {
1298             result = t;
1299             break;
1300         }
1301     }
1302 
1303     return result;
1304 }
1305 
kick_all_tcp_devices()1306 void kick_all_tcp_devices() {
1307     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1308     for (auto& t : transport_list) {
1309         if (t->IsTcpDevice()) {
1310             // Kicking breaks the read_transport thread of this transport out of any read, then
1311             // the read_transport thread will notify the main thread to make this transport
1312             // offline. Then the main thread will notify the write_transport thread to exit.
1313             // Finally, this transport will be closed and freed in the main thread.
1314             t->Kick();
1315         }
1316     }
1317 #if ADB_HOST
1318     reconnect_handler.CheckForKicked();
1319 #endif
1320 }
1321 
1322 #endif
1323 
register_usb_transport(usb_handle * usb,const char * serial,const char * devpath,unsigned writeable)1324 void register_usb_transport(usb_handle* usb, const char* serial, const char* devpath,
1325                             unsigned writeable) {
1326     atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm);
1327 
1328     D("transport: %p init'ing for usb_handle %p (sn='%s')", t, usb, serial ? serial : "");
1329     init_usb_transport(t, usb);
1330     if (serial) {
1331         t->serial = serial;
1332     }
1333 
1334     if (devpath) {
1335         t->devpath = devpath;
1336     }
1337 
1338     {
1339         std::lock_guard<std::recursive_mutex> lock(transport_lock);
1340         pending_list.push_front(t);
1341     }
1342 
1343     register_transport(t);
1344 }
1345 
1346 #if ADB_HOST
1347 // This should only be used for transports with connection_state == kCsNoPerm.
unregister_usb_transport(usb_handle * usb)1348 void unregister_usb_transport(usb_handle* usb) {
1349     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1350     transport_list.remove_if([usb](atransport* t) {
1351         return t->GetUsbHandle() == usb && t->GetConnectionState() == kCsNoPerm;
1352     });
1353 }
1354 #endif
1355 
check_header(apacket * p,atransport * t)1356 bool check_header(apacket* p, atransport* t) {
1357     if (p->msg.magic != (p->msg.command ^ 0xffffffff)) {
1358         VLOG(RWX) << "check_header(): invalid magic command = " << std::hex << p->msg.command
1359                   << ", magic = " << p->msg.magic;
1360         return false;
1361     }
1362 
1363     if (p->msg.data_length > t->get_max_payload()) {
1364         VLOG(RWX) << "check_header(): " << p->msg.data_length
1365                   << " atransport::max_payload = " << t->get_max_payload();
1366         return false;
1367     }
1368 
1369     return true;
1370 }
1371 
1372 #if ADB_HOST
NextKey()1373 std::shared_ptr<RSA> atransport::NextKey() {
1374     if (keys_.empty()) {
1375         LOG(INFO) << "fetching keys for transport " << this->serial_name();
1376         keys_ = adb_auth_get_private_keys();
1377 
1378         // We should have gotten at least one key: the one that's automatically generated.
1379         CHECK(!keys_.empty());
1380     }
1381 
1382     std::shared_ptr<RSA> result = keys_[0];
1383     keys_.pop_front();
1384     return result;
1385 }
1386 
ResetKeys()1387 void atransport::ResetKeys() {
1388     keys_.clear();
1389 }
1390 #endif
1391