• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * Encapsulates exchange protocol between the emulator, and an Android device
19  * that is connected to the host via USB. The communication is established over
20  * a TCP port forwarding, enabled by ADB.
21  */
22 
23 #include "android/utils/debug.h"
24 #include "android/async-socket-connector.h"
25 #include "android/async-socket.h"
26 #include "utils/panic.h"
27 #include "iolooper.h"
28 
29 #define  E(...)    derror(__VA_ARGS__)
30 #define  W(...)    dwarning(__VA_ARGS__)
31 #define  D(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
32 #define  D_ACTIVE  VERBOSE_CHECK(asyncsocket)
33 
34 #define TRACE_ON    0
35 
36 #if TRACE_ON
37 #define  T(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
38 #else
39 #define  T(...)
40 #endif
41 
42 /********************************************************************************
43  *                  Asynchronous Socket internal API declarations
44  *******************************************************************************/
45 
46 /* Gets socket's address string. */
47 static const char* _async_socket_string(AsyncSocket* as);
48 
49 /* Gets socket's looper. */
50 static Looper* _async_socket_get_looper(AsyncSocket* as);
51 
52 /* Handler for the I/O time out.
53  * Param:
54  *  as - Asynchronous socket for the I/O.
55  *  asio - Desciptor for the timed out I/O.
56  */
57 static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
58                                                 AsyncSocketIO* asio);
59 
60 /********************************************************************************
61  *                  Asynchronous Socket Reader / Writer
62  *******************************************************************************/
63 
64 struct AsyncSocketIO {
65     /* Next I/O in the reader, or writer list. */
66     AsyncSocketIO*      next;
67     /* Asynchronous socket for this I/O. */
68     AsyncSocket*        as;
69     /* Timer used for time outs on this I/O. */
70     LoopTimer           timer[1];
71     /* An opaque pointer associated with this I/O. */
72     void*               io_opaque;
73     /* Buffer where to read / write data. */
74     uint8_t*            buffer;
75     /* Bytes to transfer through the socket for this I/O. */
76     uint32_t            to_transfer;
77     /* Bytes thransferred through the socket in this I/O. */
78     uint32_t            transferred;
79     /* I/O callback for this I/O. */
80     on_as_io_cb         on_io;
81     /* I/O type selector: 1 - read, 0 - write. */
82     int                 is_io_read;
83     /* State of the I/O. */
84     AsyncIOState        state;
85     /* Number of outstanding references to the I/O. */
86     int                 ref_count;
87     /* Deadline for this I/O */
88     Duration            deadline;
89 };
90 
91 /*
92  * Recycling I/O instances.
93  * Since AsyncSocketIO instances are not that large, it makes sence to recycle
94  * them for faster allocation, rather than allocating and freeing them for each
95  * I/O on the socket.
96  */
97 
98 /* List of recycled I/O descriptors. */
99 static AsyncSocketIO* _asio_recycled    = NULL;
100 /* Number of I/O descriptors that are recycled in the _asio_recycled list. */
101 static int _recycled_asio_count         = 0;
102 /* Maximum number of I/O descriptors that can be recycled. */
103 static const int _max_recycled_asio_num = 32;
104 
105 /* Handler for an I/O time-out timer event.
106  * When this routine is invoked, it indicates that a time out has occurred on an
107  * I/O.
108  * Param:
109  *  opaque - AsyncSocketIO instance representing the timed out I/O.
110  */
111 static void _on_async_socket_io_timed_out(void* opaque);
112 
113 /* Creates new I/O descriptor.
114  * Param:
115  *  as - Asynchronous socket for the I/O.
116  *  is_io_read - I/O type selector: 1 - read, 0 - write.
117  *  buffer, len - Reader / writer buffer address.
118  *  io_cb - Callback for this reader / writer.
119  *  io_opaque - An opaque pointer associated with the I/O.
120  *  deadline - Deadline to complete the I/O.
121  * Return:
122  *  Initialized AsyncSocketIO instance.
123  */
124 static AsyncSocketIO*
_async_socket_rw_new(AsyncSocket * as,int is_io_read,void * buffer,uint32_t len,on_as_io_cb io_cb,void * io_opaque,Duration deadline)125 _async_socket_rw_new(AsyncSocket* as,
126                      int is_io_read,
127                      void* buffer,
128                      uint32_t len,
129                      on_as_io_cb io_cb,
130                      void* io_opaque,
131                      Duration deadline)
132 {
133     /* Lookup in the recycler first. */
134     AsyncSocketIO* asio = _asio_recycled;
135     if (asio != NULL) {
136         /* Pull the descriptor from recycler. */
137         _asio_recycled = asio->next;
138         _recycled_asio_count--;
139     } else {
140         /* No recycled descriptors. Allocate new one. */
141         ANEW0(asio);
142     }
143 
144     asio->next          = NULL;
145     asio->as            = as;
146     asio->is_io_read    = is_io_read;
147     asio->buffer        = (uint8_t*)buffer;
148     asio->to_transfer   = len;
149     asio->transferred   = 0;
150     asio->on_io         = io_cb;
151     asio->io_opaque     = io_opaque;
152     asio->state         = ASIO_STATE_QUEUED;
153     asio->ref_count     = 1;
154     asio->deadline      = deadline;
155     loopTimer_init(asio->timer, _async_socket_get_looper(as),
156                    _on_async_socket_io_timed_out, asio);
157     loopTimer_startAbsolute(asio->timer, deadline);
158 
159     /* Reference socket that is holding this I/O. */
160     async_socket_reference(as);
161 
162     T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data",
163       _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len);
164 
165     return asio;
166 }
167 
168 /* Destroys and frees I/O descriptor. */
169 static void
_async_socket_io_free(AsyncSocketIO * asio)170 _async_socket_io_free(AsyncSocketIO* asio)
171 {
172     AsyncSocket* const as = asio->as;
173 
174     T("ASocket %s: %s I/O descriptor %p is destroyed.",
175       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
176 
177     loopTimer_done(asio->timer);
178 
179     /* Try to recycle it first, and free the memory if recycler is full. */
180     if (_recycled_asio_count < _max_recycled_asio_num) {
181         asio->next = _asio_recycled;
182         _asio_recycled = asio;
183         _recycled_asio_count++;
184     } else {
185         AFREE(asio);
186     }
187 
188     /* Release socket that is holding this I/O. */
189     async_socket_release(as);
190 }
191 
192 /* An I/O has been finished and its descriptor is about to be discarded. */
193 static void
_async_socket_io_finished(AsyncSocketIO * asio)194 _async_socket_io_finished(AsyncSocketIO* asio)
195 {
196     /* Notify the client of the I/O that I/O is finished. */
197     asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED);
198 }
199 
200 int
async_socket_io_reference(AsyncSocketIO * asio)201 async_socket_io_reference(AsyncSocketIO* asio)
202 {
203     assert(asio->ref_count > 0);
204     asio->ref_count++;
205     return asio->ref_count;
206 }
207 
208 int
async_socket_io_release(AsyncSocketIO * asio)209 async_socket_io_release(AsyncSocketIO* asio)
210 {
211     assert(asio->ref_count > 0);
212     asio->ref_count--;
213     if (asio->ref_count == 0) {
214         _async_socket_io_finished(asio);
215         /* Last reference has been dropped. Destroy this object. */
216         _async_socket_io_free(asio);
217         return 0;
218     }
219     return asio->ref_count;
220 }
221 
222 /* Creates new asynchronous socket reader.
223  * Param:
224  *  as - Asynchronous socket for the reader.
225  *  buffer, len - Reader's buffer.
226  *  io_cb - Reader's callback.
227  *  reader_opaque - An opaque pointer associated with the reader.
228  *  deadline - Deadline to complete the operation.
229  * Return:
230  *  An initialized AsyncSocketIO intance.
231  */
232 static AsyncSocketIO*
_async_socket_reader_new(AsyncSocket * as,void * buffer,uint32_t len,on_as_io_cb io_cb,void * reader_opaque,Duration deadline)233 _async_socket_reader_new(AsyncSocket* as,
234                          void* buffer,
235                          uint32_t len,
236                          on_as_io_cb io_cb,
237                          void* reader_opaque,
238                          Duration deadline)
239 {
240     AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
241                                                      reader_opaque, deadline);
242     return asio;
243 }
244 
245 /* Creates new asynchronous socket writer.
246  * Param:
247  *  as - Asynchronous socket for the writer.
248  *  buffer, len - Writer's buffer.
249  *  io_cb - Writer's callback.
250  *  writer_opaque - An opaque pointer associated with the writer.
251  *  deadline - Deadline to complete the operation.
252  * Return:
253  *  An initialized AsyncSocketIO intance.
254  */
255 static AsyncSocketIO*
_async_socket_writer_new(AsyncSocket * as,const void * buffer,uint32_t len,on_as_io_cb io_cb,void * writer_opaque,Duration deadline)256 _async_socket_writer_new(AsyncSocket* as,
257                          const void* buffer,
258                          uint32_t len,
259                          on_as_io_cb io_cb,
260                          void* writer_opaque,
261                          Duration deadline)
262 {
263     AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
264                                                      io_cb, writer_opaque,
265                                                      deadline);
266     return asio;
267 }
268 
269 /* I/O timed out. */
270 static void
_on_async_socket_io_timed_out(void * opaque)271 _on_async_socket_io_timed_out(void* opaque)
272 {
273     AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
274     AsyncSocket* const as = asio->as;
275 
276     D("ASocket %s: %s I/O with deadline %lld has timed out at %lld",
277       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
278       asio->deadline, async_socket_deadline(as, 0));
279 
280     /* Reference while in callback. */
281     async_socket_io_reference(asio);
282     _async_socket_io_timed_out(asio->as, asio);
283     async_socket_io_release(asio);
284 }
285 
286 /********************************************************************************
287  *                 Public Asynchronous Socket I/O API
288  *******************************************************************************/
289 
290 AsyncSocket*
async_socket_io_get_socket(const AsyncSocketIO * asio)291 async_socket_io_get_socket(const AsyncSocketIO* asio)
292 {
293     async_socket_reference(asio->as);
294     return asio->as;
295 }
296 
297 void
async_socket_io_cancel_time_out(AsyncSocketIO * asio)298 async_socket_io_cancel_time_out(AsyncSocketIO* asio)
299 {
300     loopTimer_stop(asio->timer);
301 }
302 
303 void*
async_socket_io_get_io_opaque(const AsyncSocketIO * asio)304 async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
305 {
306     return asio->io_opaque;
307 }
308 
309 void*
async_socket_io_get_client_opaque(const AsyncSocketIO * asio)310 async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
311 {
312     return async_socket_get_client_opaque(asio->as);
313 }
314 
315 void*
async_socket_io_get_buffer_info(const AsyncSocketIO * asio,uint32_t * transferred,uint32_t * to_transfer)316 async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
317                                 uint32_t* transferred,
318                                 uint32_t* to_transfer)
319 {
320     if (transferred != NULL) {
321         *transferred = asio->transferred;
322     }
323     if (to_transfer != NULL) {
324         *to_transfer = asio->to_transfer;
325     }
326     return asio->buffer;
327 }
328 
329 void*
async_socket_io_get_buffer(const AsyncSocketIO * asio)330 async_socket_io_get_buffer(const AsyncSocketIO* asio)
331 {
332     return asio->buffer;
333 }
334 
335 uint32_t
async_socket_io_get_transferred(const AsyncSocketIO * asio)336 async_socket_io_get_transferred(const AsyncSocketIO* asio)
337 {
338     return asio->transferred;
339 }
340 
341 uint32_t
async_socket_io_get_to_transfer(const AsyncSocketIO * asio)342 async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
343 {
344     return asio->to_transfer;
345 }
346 
347 int
async_socket_io_is_read(const AsyncSocketIO * asio)348 async_socket_io_is_read(const AsyncSocketIO* asio)
349 {
350     return asio->is_io_read;
351 }
352 
353 /********************************************************************************
354  *                      Asynchronous Socket internals
355  *******************************************************************************/
356 
357 struct AsyncSocket {
358     /* TCP address for the socket. */
359     SockAddress         address;
360     /* Connection callback for this socket. */
361     on_as_connection_cb on_connection;
362     /* An opaque pointer associated with this socket by the client. */
363     void*               client_opaque;
364     /* I/O looper for asynchronous I/O on the socket. */
365     Looper*             looper;
366     /* I/O descriptor for asynchronous I/O on the socket. */
367     LoopIo              io[1];
368     /* Timer to use for reconnection attempts. */
369     LoopTimer           reconnect_timer[1];
370     /* Head of the list of the active readers. */
371     AsyncSocketIO*      readers_head;
372     /* Tail of the list of the active readers. */
373     AsyncSocketIO*      readers_tail;
374     /* Head of the list of the active writers. */
375     AsyncSocketIO*      writers_head;
376     /* Tail of the list of the active writers. */
377     AsyncSocketIO*      writers_tail;
378     /* Socket's file descriptor. */
379     int                 fd;
380     /* Timeout to use for reconnection attempts. */
381     int                 reconnect_to;
382     /* Number of outstanding references to the socket. */
383     int                 ref_count;
384     /* Flags whether (1) or not (0) socket owns the looper. */
385     int                 owns_looper;
386 };
387 
388 static const char*
_async_socket_string(AsyncSocket * as)389 _async_socket_string(AsyncSocket* as)
390 {
391     return sock_address_to_string(&as->address);
392 }
393 
394 static Looper*
_async_socket_get_looper(AsyncSocket * as)395 _async_socket_get_looper(AsyncSocket* as)
396 {
397     return as->looper;
398 }
399 
400 /* Pulls first reader out of the list.
401  * Param:
402  *  as - Initialized AsyncSocket instance.
403  * Return:
404  *  First I/O pulled out of the list, or NULL if there are no I/O in the list.
405  *  Note that the caller is responsible for releasing the I/O object returned
406  *  from this routine.
407  */
408 static AsyncSocketIO*
_async_socket_pull_first_io(AsyncSocket * as,AsyncSocketIO ** list_head,AsyncSocketIO ** list_tail)409 _async_socket_pull_first_io(AsyncSocket* as,
410                             AsyncSocketIO** list_head,
411                             AsyncSocketIO** list_tail)
412 {
413     AsyncSocketIO* const ret = *list_head;
414     if (ret != NULL) {
415         *list_head = ret->next;
416         ret->next = NULL;
417         if (*list_head == NULL) {
418             *list_tail = NULL;
419         }
420     }
421     return ret;
422 }
423 
424 /* Pulls first reader out of the list.
425  * Param:
426  *  as - Initialized AsyncSocket instance.
427  * Return:
428  *  First reader pulled out of the list, or NULL if there are no readers in the
429  *  list.
430  *  Note that the caller is responsible for releasing the I/O object returned
431  *  from this routine.
432  */
433 static AsyncSocketIO*
_async_socket_pull_first_reader(AsyncSocket * as)434 _async_socket_pull_first_reader(AsyncSocket* as)
435 {
436     return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
437 }
438 
439 /* Pulls first writer out of the list.
440  * Param:
441  *  as - Initialized AsyncSocket instance.
442  * Return:
443  *  First writer pulled out of the list, or NULL if there are no writers in the
444  *  list.
445  *  Note that the caller is responsible for releasing the I/O object returned
446  *  from this routine.
447  */
448 static AsyncSocketIO*
_async_socket_pull_first_writer(AsyncSocket * as)449 _async_socket_pull_first_writer(AsyncSocket* as)
450 {
451     return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
452 }
453 
454 /* Removes an I/O descriptor from a list of active I/O.
455  * Param:
456  *  as - Initialized AsyncSocket instance.
457  *  list_head, list_tail - Pointers to the list head and tail.
458  *  io - I/O to remove.
459  * Return:
460  *  Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
461  */
462 static int
_async_socket_remove_io(AsyncSocket * as,AsyncSocketIO ** list_head,AsyncSocketIO ** list_tail,AsyncSocketIO * io)463 _async_socket_remove_io(AsyncSocket* as,
464                         AsyncSocketIO** list_head,
465                         AsyncSocketIO** list_tail,
466                         AsyncSocketIO* io)
467 {
468     AsyncSocketIO* prev = NULL;
469 
470     while (*list_head != NULL && io != *list_head) {
471         prev = *list_head;
472         list_head = &((*list_head)->next);
473     }
474     if (*list_head == NULL) {
475         D("%s: I/O %p is not found in the list for socket '%s'",
476           __FUNCTION__, io, _async_socket_string(as));
477         return 0;
478     }
479 
480     *list_head = io->next;
481     if (prev != NULL) {
482         prev->next = io->next;
483     }
484     if (*list_tail == io) {
485         *list_tail = prev;
486     }
487 
488     /* Release I/O adjusting reference added when I/O has been saved in the list. */
489     async_socket_io_release(io);
490 
491     return 1;
492 }
493 
494 /* Advances to the next I/O in the list.
495  * Param:
496  *  as - Initialized AsyncSocket instance.
497  *  list_head, list_tail - Pointers to the list head and tail.
498  */
499 static void
_async_socket_advance_io(AsyncSocket * as,AsyncSocketIO ** list_head,AsyncSocketIO ** list_tail)500 _async_socket_advance_io(AsyncSocket* as,
501                          AsyncSocketIO** list_head,
502                          AsyncSocketIO** list_tail)
503 {
504     AsyncSocketIO* first_io = *list_head;
505     if (first_io != NULL) {
506         *list_head = first_io->next;
507         first_io->next = NULL;
508     }
509     if (*list_head == NULL) {
510         *list_tail = NULL;
511     }
512     if (first_io != NULL) {
513         /* Release I/O removed from the head of the list. */
514         async_socket_io_release(first_io);
515     }
516 }
517 
518 /* Advances to the next reader in the list.
519  * Param:
520  *  as - Initialized AsyncSocket instance.
521  */
522 static void
_async_socket_advance_reader(AsyncSocket * as)523 _async_socket_advance_reader(AsyncSocket* as)
524 {
525     _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
526 }
527 
528 /* Advances to the next writer in the list.
529  * Param:
530  *  as - Initialized AsyncSocket instance.
531  */
532 static void
_async_socket_advance_writer(AsyncSocket * as)533 _async_socket_advance_writer(AsyncSocket* as)
534 {
535     _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
536 }
537 
538 /* Completes an I/O.
539  * Param:
540  *  as - Initialized AsyncSocket instance.
541  *  asio - I/O to complete.
542  * Return:
543  *  One of AsyncIOAction values.
544  */
545 static AsyncIOAction
_async_socket_complete_io(AsyncSocket * as,AsyncSocketIO * asio)546 _async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
547 {
548     T("ASocket %s: %s I/O %p is completed.",
549       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
550 
551     /* Stop the timer. */
552     async_socket_io_cancel_time_out(asio);
553 
554     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
555 }
556 
557 /* Timeouts an I/O.
558  * Param:
559  *  as - Initialized AsyncSocket instance.
560  *  asio - An I/O that has timed out.
561  * Return:
562  *  One of AsyncIOAction values.
563  */
564 static AsyncIOAction
_async_socket_io_timed_out(AsyncSocket * as,AsyncSocketIO * asio)565 _async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
566 {
567     T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld",
568       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
569       asio->deadline, async_socket_deadline(as, 0));
570 
571     /* Report to the client. */
572     const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
573                                              ASIO_STATE_TIMED_OUT);
574 
575     /* Remove the I/O from a list of active I/O for actions other than retry. */
576     if (action != ASIO_ACTION_RETRY) {
577         if (asio->is_io_read) {
578             _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
579         } else {
580             _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
581         }
582     }
583 
584     return action;
585 }
586 
587 /* Cancels an I/O.
588  * Param:
589  *  as - Initialized AsyncSocket instance.
590  *  asio - An I/O to cancel.
591  * Return:
592  *  One of AsyncIOAction values.
593  */
594 static AsyncIOAction
_async_socket_cancel_io(AsyncSocket * as,AsyncSocketIO * asio)595 _async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
596 {
597     T("ASocket %s: %s I/O %p is cancelled.",
598       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
599 
600     /* Stop the timer. */
601     async_socket_io_cancel_time_out(asio);
602 
603     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
604 }
605 
606 /* Reports an I/O failure.
607  * Param:
608  *  as - Initialized AsyncSocket instance.
609  *  asio - An I/O that has failed. Can be NULL for general failures.
610  *  failure - Failure (errno) that has occurred.
611  * Return:
612  *  One of AsyncIOAction values.
613  */
614 static AsyncIOAction
_async_socket_io_failure(AsyncSocket * as,AsyncSocketIO * asio,int failure)615 _async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
616 {
617     T("ASocket %s: %s I/O %p has failed: %d -> %s",
618       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
619       failure, strerror(failure));
620 
621     /* Stop the timer. */
622     async_socket_io_cancel_time_out(asio);
623 
624     errno = failure;
625     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
626 }
627 
628 /* Cancels all the active socket readers.
629  * Param:
630  *  as - Initialized AsyncSocket instance.
631  */
632 static void
_async_socket_cancel_readers(AsyncSocket * as)633 _async_socket_cancel_readers(AsyncSocket* as)
634 {
635     while (as->readers_head != NULL) {
636         AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
637         /* We ignore action returned from the cancellation callback, since we're
638          * in a disconnected state here. */
639         _async_socket_cancel_io(as, to_cancel);
640         async_socket_io_release(to_cancel);
641     }
642 }
643 
644 /* Cancels all the active socket writers.
645  * Param:
646  *  as - Initialized AsyncSocket instance.
647  */
648 static void
_async_socket_cancel_writers(AsyncSocket * as)649 _async_socket_cancel_writers(AsyncSocket* as)
650 {
651     while (as->writers_head != NULL) {
652         AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
653         /* We ignore action returned from the cancellation callback, since we're
654          * in a disconnected state here. */
655         _async_socket_cancel_io(as, to_cancel);
656         async_socket_io_release(to_cancel);
657     }
658 }
659 
660 /* Cancels all the I/O on the socket. */
661 static void
_async_socket_cancel_all_io(AsyncSocket * as)662 _async_socket_cancel_all_io(AsyncSocket* as)
663 {
664     /* Stop the reconnection timer. */
665     loopTimer_stop(as->reconnect_timer);
666 
667     /* Stop read / write on the socket. */
668     loopIo_dontWantWrite(as->io);
669     loopIo_dontWantRead(as->io);
670 
671     /* Cancel active readers and writers. */
672     _async_socket_cancel_readers(as);
673     _async_socket_cancel_writers(as);
674 }
675 
676 /* Closes socket handle used by the async socket.
677  * Param:
678  *  as - Initialized AsyncSocket instance.
679  */
680 static void
_async_socket_close_socket(AsyncSocket * as)681 _async_socket_close_socket(AsyncSocket* as)
682 {
683     if (as->fd >= 0) {
684         T("ASocket %s: Socket handle %d is closed.",
685           _async_socket_string(as), as->fd);
686         loopIo_done(as->io);
687         socket_close(as->fd);
688         as->fd = -1;
689     }
690 }
691 
692 /* Destroys AsyncSocket instance.
693  * Param:
694  *  as - Initialized AsyncSocket instance.
695  */
696 static void
_async_socket_free(AsyncSocket * as)697 _async_socket_free(AsyncSocket* as)
698 {
699     if (as != NULL) {
700         T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as));
701 
702         /* Close socket. */
703         _async_socket_close_socket(as);
704 
705         /* Free allocated resources. */
706         if (as->looper != NULL) {
707             loopTimer_done(as->reconnect_timer);
708             if (as->owns_looper) {
709                 looper_free(as->looper);
710             }
711         }
712         sock_address_done(&as->address);
713         AFREE(as);
714     }
715 }
716 
717 /* Starts reconnection attempts after connection has been lost.
718  * Param:
719  *  as - Initialized AsyncSocket instance.
720  *  to - Milliseconds to wait before reconnection attempt.
721  */
722 static void
_async_socket_reconnect(AsyncSocket * as,int to)723 _async_socket_reconnect(AsyncSocket* as, int to)
724 {
725     T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to);
726 
727     /* Make sure that no I/O is active, and socket is closed before we
728      * reconnect. */
729     _async_socket_cancel_all_io(as);
730 
731     /* Set the timer for reconnection attempt. */
732     loopTimer_startRelative(as->reconnect_timer, to);
733 }
734 
735 /********************************************************************************
736  *                      Asynchronous Socket callbacks
737  *******************************************************************************/
738 
739 /* A callback that is invoked when socket gets disconnected.
740  * Param:
741  *  as - Initialized AsyncSocket instance.
742  */
743 static void
_on_async_socket_disconnected(AsyncSocket * as)744 _on_async_socket_disconnected(AsyncSocket* as)
745 {
746     /* Save error to restore it for the client's callback. */
747     const int save_errno = errno;
748     AsyncIOAction action = ASIO_ACTION_ABORT;
749 
750     D("ASocket %s: Disconnected.", _async_socket_string(as));
751 
752     /* Cancel all the I/O on this socket. */
753     _async_socket_cancel_all_io(as);
754 
755     /* Close the socket. */
756     _async_socket_close_socket(as);
757 
758     /* Restore errno, and invoke client's callback. */
759     errno = save_errno;
760     action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
761 
762     if (action == ASIO_ACTION_RETRY) {
763         /* Client requested reconnection. */
764         _async_socket_reconnect(as, as->reconnect_to);
765     }
766 }
767 
768 /* A callback that is invoked on socket's I/O failure.
769  * Param:
770  *  as - Initialized AsyncSocket instance.
771  *  asio - Descriptor for the failed I/O. Can be NULL for general failures.
772  */
773 static AsyncIOAction
_on_async_socket_failure(AsyncSocket * as,AsyncSocketIO * asio)774 _on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
775 {
776     D("ASocket %s: %s I/O failure: %d -> %s",
777       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
778       errno, strerror(errno));
779 
780     /* Report the failure. */
781     return _async_socket_io_failure(as, asio, errno);
782 }
783 
784 /* A callback that is invoked when there is data available to read.
785  * Param:
786  *  as - Initialized AsyncSocket instance.
787  * Return:
788  *  0 on success, or -1 on failure. Failure returned from this routine will
789  *  skip writes (if awailable) behind this read.
790  */
791 static int
_on_async_socket_recv(AsyncSocket * as)792 _on_async_socket_recv(AsyncSocket* as)
793 {
794     AsyncIOAction action;
795 
796     /* Get current reader. */
797     AsyncSocketIO* const asr = as->readers_head;
798     if (asr == NULL) {
799         D("ASocket %s: No reader is available.", _async_socket_string(as));
800         loopIo_dontWantRead(as->io);
801         return 0;
802     }
803 
804     /* Reference the reader while we're working with it in this callback. */
805     async_socket_io_reference(asr);
806 
807     /* Bump I/O state, and inform the client that I/O is in progress. */
808     if (asr->state == ASIO_STATE_QUEUED) {
809         asr->state = ASIO_STATE_STARTED;
810     } else {
811         asr->state = ASIO_STATE_CONTINUES;
812     }
813     action = asr->on_io(asr->io_opaque, asr, asr->state);
814     if (action == ASIO_ACTION_ABORT) {
815         D("ASocket %s: Read is aborted by the client.", _async_socket_string(as));
816         /* Move on to the next reader. */
817         _async_socket_advance_reader(as);
818         /* Lets see if there are still active readers, and enable, or disable
819          * read I/O callback accordingly. */
820         if (as->readers_head != NULL) {
821             loopIo_wantRead(as->io);
822         } else {
823             loopIo_dontWantRead(as->io);
824         }
825         async_socket_io_release(asr);
826         return 0;
827     }
828 
829     /* Read next chunk of data. */
830     int res = socket_recv(as->fd, asr->buffer + asr->transferred,
831                           asr->to_transfer - asr->transferred);
832     while (res < 0 && errno == EINTR) {
833         res = socket_recv(as->fd, asr->buffer + asr->transferred,
834                           asr->to_transfer - asr->transferred);
835     }
836 
837     if (res == 0) {
838         /* Socket has been disconnected. */
839         errno = ECONNRESET;
840         _on_async_socket_disconnected(as);
841         async_socket_io_release(asr);
842         return -1;
843     }
844 
845     if (res < 0) {
846         if (errno == EWOULDBLOCK || errno == EAGAIN) {
847             /* Yield to writes behind this read. */
848             loopIo_wantRead(as->io);
849             async_socket_io_release(asr);
850             return 0;
851         }
852 
853         /* An I/O error. */
854         action = _on_async_socket_failure(as, asr);
855         if (action != ASIO_ACTION_RETRY) {
856             D("ASocket %s: Read is aborted on failure.", _async_socket_string(as));
857             /* Move on to the next reader. */
858             _async_socket_advance_reader(as);
859             /* Lets see if there are still active readers, and enable, or disable
860              * read I/O callback accordingly. */
861             if (as->readers_head != NULL) {
862                 loopIo_wantRead(as->io);
863             } else {
864                 loopIo_dontWantRead(as->io);
865             }
866         }
867         async_socket_io_release(asr);
868         return -1;
869     }
870 
871     /* Update the reader's descriptor. */
872     asr->transferred += res;
873     if (asr->transferred == asr->to_transfer) {
874         /* This read is completed. Move on to the next reader. */
875         _async_socket_advance_reader(as);
876 
877         /* Notify reader completion. */
878         _async_socket_complete_io(as, asr);
879     }
880 
881     /* Lets see if there are still active readers, and enable, or disable read
882      * I/O callback accordingly. */
883     if (as->readers_head != NULL) {
884         loopIo_wantRead(as->io);
885     } else {
886         loopIo_dontWantRead(as->io);
887     }
888 
889     async_socket_io_release(asr);
890 
891     return 0;
892 }
893 
894 /* A callback that is invoked when there is data available to write.
895  * Param:
896  *  as - Initialized AsyncSocket instance.
897  * Return:
898  *  0 on success, or -1 on failure. Failure returned from this routine will
899  *  skip reads (if awailable) behind this write.
900  */
901 static int
_on_async_socket_send(AsyncSocket * as)902 _on_async_socket_send(AsyncSocket* as)
903 {
904     AsyncIOAction action;
905 
906     /* Get current writer. */
907     AsyncSocketIO* const asw = as->writers_head;
908     if (asw == NULL) {
909         D("ASocket %s: No writer is available.", _async_socket_string(as));
910         loopIo_dontWantWrite(as->io);
911         return 0;
912     }
913 
914     /* Reference the writer while we're working with it in this callback. */
915     async_socket_io_reference(asw);
916 
917     /* Bump I/O state, and inform the client that I/O is in progress. */
918     if (asw->state == ASIO_STATE_QUEUED) {
919         asw->state = ASIO_STATE_STARTED;
920     } else {
921         asw->state = ASIO_STATE_CONTINUES;
922     }
923     action = asw->on_io(asw->io_opaque, asw, asw->state);
924     if (action == ASIO_ACTION_ABORT) {
925         D("ASocket %s: Write is aborted by the client.", _async_socket_string(as));
926         /* Move on to the next writer. */
927         _async_socket_advance_writer(as);
928         /* Lets see if there are still active writers, and enable, or disable
929          * write I/O callback accordingly. */
930         if (as->writers_head != NULL) {
931             loopIo_wantWrite(as->io);
932         } else {
933             loopIo_dontWantWrite(as->io);
934         }
935         async_socket_io_release(asw);
936         return 0;
937     }
938 
939     /* Write next chunk of data. */
940     int res = socket_send(as->fd, asw->buffer + asw->transferred,
941                           asw->to_transfer - asw->transferred);
942     while (res < 0 && errno == EINTR) {
943         res = socket_send(as->fd, asw->buffer + asw->transferred,
944                           asw->to_transfer - asw->transferred);
945     }
946 
947     if (res == 0) {
948         /* Socket has been disconnected. */
949         errno = ECONNRESET;
950         _on_async_socket_disconnected(as);
951         async_socket_io_release(asw);
952         return -1;
953     }
954 
955     if (res < 0) {
956         if (errno == EWOULDBLOCK || errno == EAGAIN) {
957             /* Yield to reads behind this write. */
958             loopIo_wantWrite(as->io);
959             async_socket_io_release(asw);
960             return 0;
961         }
962 
963         /* An I/O error. */
964         action = _on_async_socket_failure(as, asw);
965         if (action != ASIO_ACTION_RETRY) {
966             D("ASocket %s: Write is aborted on failure.", _async_socket_string(as));
967             /* Move on to the next writer. */
968             _async_socket_advance_writer(as);
969             /* Lets see if there are still active writers, and enable, or disable
970              * write I/O callback accordingly. */
971             if (as->writers_head != NULL) {
972                 loopIo_wantWrite(as->io);
973             } else {
974                 loopIo_dontWantWrite(as->io);
975             }
976         }
977         async_socket_io_release(asw);
978         return -1;
979     }
980 
981     /* Update the writer descriptor. */
982     asw->transferred += res;
983     if (asw->transferred == asw->to_transfer) {
984         /* This write is completed. Move on to the next writer. */
985         _async_socket_advance_writer(as);
986 
987         /* Notify writer completion. */
988         _async_socket_complete_io(as, asw);
989     }
990 
991     /* Lets see if there are still active writers, and enable, or disable write
992      * I/O callback accordingly. */
993     if (as->writers_head != NULL) {
994         loopIo_wantWrite(as->io);
995     } else {
996         loopIo_dontWantWrite(as->io);
997     }
998 
999     async_socket_io_release(asw);
1000 
1001     return 0;
1002 }
1003 
1004 /* A callback that is invoked when an I/O is available on socket.
1005  * Param:
1006  *  as - Initialized AsyncSocket instance.
1007  *  fd - Socket's file descriptor.
1008  *  events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
1009  */
1010 static void
_on_async_socket_io(void * opaque,int fd,unsigned events)1011 _on_async_socket_io(void* opaque, int fd, unsigned events)
1012 {
1013     AsyncSocket* const as = (AsyncSocket*)opaque;
1014 
1015     /* Reference the socket while we're working with it in this callback. */
1016     async_socket_reference(as);
1017 
1018     if ((events & LOOP_IO_READ) != 0) {
1019         if (_on_async_socket_recv(as) != 0) {
1020             async_socket_release(as);
1021             return;
1022         }
1023     }
1024 
1025     if ((events & LOOP_IO_WRITE) != 0) {
1026         if (_on_async_socket_send(as) != 0) {
1027             async_socket_release(as);
1028             return;
1029         }
1030     }
1031 
1032     async_socket_release(as);
1033 }
1034 
1035 /* A callback that is invoked by asynchronous socket connector on connection
1036  *  events.
1037  * Param:
1038  *  opaque - Initialized AsyncSocket instance.
1039  *  connector - Connector that is used to connect this socket.
1040  *  event - Connection event.
1041  * Return:
1042  *  One of AsyncIOAction values.
1043  */
1044 static AsyncIOAction
_on_connector_events(void * opaque,AsyncSocketConnector * connector,AsyncIOState event)1045 _on_connector_events(void* opaque,
1046                      AsyncSocketConnector* connector,
1047                      AsyncIOState event)
1048 {
1049     AsyncIOAction action;
1050     AsyncSocket* const as = (AsyncSocket*)opaque;
1051 
1052     /* Reference the socket while we're working with it in this callback. */
1053     async_socket_reference(as);
1054 
1055     if (event == ASIO_STATE_SUCCEEDED) {
1056         /* Accept the connection. */
1057         as->fd = async_socket_connector_pull_fd(connector);
1058         loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
1059     }
1060 
1061     /* Invoke client's callback. */
1062     action = as->on_connection(as->client_opaque, as, event);
1063     if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
1064         /* For whatever reason the client didn't want to keep this connection.
1065          * Close it. */
1066         D("ASocket %s: Connection is discarded by the client.",
1067           _async_socket_string(as));
1068         _async_socket_close_socket(as);
1069     }
1070 
1071     if (action != ASIO_ACTION_RETRY) {
1072         async_socket_connector_release(connector);
1073     }
1074 
1075     async_socket_release(as);
1076 
1077     return action;
1078 }
1079 
1080 /* Timer callback invoked to reconnect the lost connection.
1081  * Param:
1082  *  as - Initialized AsyncSocket instance.
1083  */
1084 void
_on_async_socket_reconnect(void * opaque)1085 _on_async_socket_reconnect(void* opaque)
1086 {
1087     AsyncSocket* as = (AsyncSocket*)opaque;
1088 
1089     /* Reference the socket while we're working with it in this callback. */
1090     async_socket_reference(as);
1091     async_socket_connect(as, as->reconnect_to);
1092     async_socket_release(as);
1093 }
1094 
1095 
1096 /********************************************************************************
1097  *                  Android Device Socket public API
1098  *******************************************************************************/
1099 
1100 AsyncSocket*
async_socket_new(int port,int reconnect_to,on_as_connection_cb client_cb,void * client_opaque,Looper * looper)1101 async_socket_new(int port,
1102                  int reconnect_to,
1103                  on_as_connection_cb client_cb,
1104                  void* client_opaque,
1105                  Looper* looper)
1106 {
1107     AsyncSocket* as;
1108 
1109     if (client_cb == NULL) {
1110         E("Invalid client_cb parameter");
1111         return NULL;
1112     }
1113 
1114     ANEW0(as);
1115 
1116     as->fd = -1;
1117     as->client_opaque = client_opaque;
1118     as->on_connection = client_cb;
1119     as->readers_head = as->readers_tail = NULL;
1120     as->reconnect_to = reconnect_to;
1121     as->ref_count = 1;
1122     sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
1123     if (looper == NULL) {
1124         as->looper = looper_newCore();
1125         if (as->looper == NULL) {
1126             E("Unable to create I/O looper for async socket '%s'",
1127               _async_socket_string(as));
1128             client_cb(client_opaque, as, ASIO_STATE_FAILED);
1129             _async_socket_free(as);
1130             return NULL;
1131         }
1132         as->owns_looper = 1;
1133     } else {
1134         as->looper = looper;
1135         as->owns_looper = 0;
1136     }
1137 
1138     loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
1139 
1140     T("ASocket %s: Descriptor is created.", _async_socket_string(as));
1141 
1142     return as;
1143 }
1144 
1145 int
async_socket_reference(AsyncSocket * as)1146 async_socket_reference(AsyncSocket* as)
1147 {
1148     assert(as->ref_count > 0);
1149     as->ref_count++;
1150     return as->ref_count;
1151 }
1152 
1153 int
async_socket_release(AsyncSocket * as)1154 async_socket_release(AsyncSocket* as)
1155 {
1156     assert(as->ref_count > 0);
1157     as->ref_count--;
1158     if (as->ref_count == 0) {
1159         /* Last reference has been dropped. Destroy this object. */
1160         _async_socket_cancel_all_io(as);
1161         _async_socket_free(as);
1162         return 0;
1163     }
1164     return as->ref_count;
1165 }
1166 
1167 void
async_socket_connect(AsyncSocket * as,int retry_to)1168 async_socket_connect(AsyncSocket* as, int retry_to)
1169 {
1170     T("ASocket %s: Handling connection request for %dms...",
1171       _async_socket_string(as), retry_to);
1172 
1173     AsyncSocketConnector* const connector =
1174         async_socket_connector_new(&as->address, retry_to, _on_connector_events,
1175                                    as, as->looper);
1176     if (connector != NULL) {
1177         async_socket_connector_connect(connector);
1178     } else {
1179         as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
1180     }
1181 }
1182 
1183 void
async_socket_disconnect(AsyncSocket * as)1184 async_socket_disconnect(AsyncSocket* as)
1185 {
1186     T("ASocket %s: Handling disconnection request...", _async_socket_string(as));
1187 
1188     if (as != NULL) {
1189         _async_socket_cancel_all_io(as);
1190         _async_socket_close_socket(as);
1191     }
1192 }
1193 
1194 void
async_socket_reconnect(AsyncSocket * as,int retry_to)1195 async_socket_reconnect(AsyncSocket* as, int retry_to)
1196 {
1197     T("ASocket %s: Handling reconnection request for %dms...",
1198       _async_socket_string(as), retry_to);
1199 
1200     _async_socket_cancel_all_io(as);
1201     _async_socket_close_socket(as);
1202     _async_socket_reconnect(as, retry_to);
1203 }
1204 
1205 void
async_socket_read_abs(AsyncSocket * as,void * buffer,uint32_t len,on_as_io_cb reader_cb,void * reader_opaque,Duration deadline)1206 async_socket_read_abs(AsyncSocket* as,
1207                       void* buffer, uint32_t len,
1208                       on_as_io_cb reader_cb,
1209                       void* reader_opaque,
1210                       Duration deadline)
1211 {
1212     T("ASocket %s: Handling read for %d bytes with deadline %lld...",
1213       _async_socket_string(as), len, deadline);
1214 
1215     AsyncSocketIO* const asr =
1216         _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
1217                                  deadline);
1218     if (async_socket_is_connected(as)) {
1219         /* Add new reader to the list. Note that we use initial reference from I/O
1220          * 'new' routine as "in the list" reference counter. */
1221         if (as->readers_head == NULL) {
1222             as->readers_head = as->readers_tail = asr;
1223         } else {
1224             as->readers_tail->next = asr;
1225             as->readers_tail = asr;
1226         }
1227         loopIo_wantRead(as->io);
1228     } else {
1229         D("ASocket %s: Read on a disconnected socket.", _async_socket_string(as));
1230         errno = ECONNRESET;
1231         reader_cb(reader_opaque, asr, ASIO_STATE_FAILED);
1232         async_socket_io_release(asr);
1233     }
1234 }
1235 
1236 void
async_socket_read_rel(AsyncSocket * as,void * buffer,uint32_t len,on_as_io_cb reader_cb,void * reader_opaque,int to)1237 async_socket_read_rel(AsyncSocket* as,
1238                       void* buffer, uint32_t len,
1239                       on_as_io_cb reader_cb,
1240                       void* reader_opaque,
1241                       int to)
1242 {
1243     const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1244                                     DURATION_INFINITE;
1245     async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
1246 }
1247 
1248 void
async_socket_write_abs(AsyncSocket * as,const void * buffer,uint32_t len,on_as_io_cb writer_cb,void * writer_opaque,Duration deadline)1249 async_socket_write_abs(AsyncSocket* as,
1250                        const void* buffer, uint32_t len,
1251                        on_as_io_cb writer_cb,
1252                        void* writer_opaque,
1253                        Duration deadline)
1254 {
1255     T("ASocket %s: Handling write for %d bytes with deadline %lld...",
1256       _async_socket_string(as), len, deadline);
1257 
1258     AsyncSocketIO* const asw =
1259         _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
1260                                  deadline);
1261     if (async_socket_is_connected(as)) {
1262         /* Add new writer to the list. Note that we use initial reference from I/O
1263          * 'new' routine as "in the list" reference counter. */
1264         if (as->writers_head == NULL) {
1265             as->writers_head = as->writers_tail = asw;
1266         } else {
1267             as->writers_tail->next = asw;
1268             as->writers_tail = asw;
1269         }
1270         loopIo_wantWrite(as->io);
1271     } else {
1272         D("ASocket %s: Write on a disconnected socket.", _async_socket_string(as));
1273         errno = ECONNRESET;
1274         writer_cb(writer_opaque, asw, ASIO_STATE_FAILED);
1275         async_socket_io_release(asw);
1276     }
1277 }
1278 
1279 void
async_socket_write_rel(AsyncSocket * as,const void * buffer,uint32_t len,on_as_io_cb writer_cb,void * writer_opaque,int to)1280 async_socket_write_rel(AsyncSocket* as,
1281                        const void* buffer, uint32_t len,
1282                        on_as_io_cb writer_cb,
1283                        void* writer_opaque,
1284                        int to)
1285 {
1286     const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1287                                     DURATION_INFINITE;
1288     async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
1289 }
1290 
1291 void*
async_socket_get_client_opaque(const AsyncSocket * as)1292 async_socket_get_client_opaque(const AsyncSocket* as)
1293 {
1294     return as->client_opaque;
1295 }
1296 
1297 Duration
async_socket_deadline(AsyncSocket * as,int rel)1298 async_socket_deadline(AsyncSocket* as, int rel)
1299 {
1300     return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
1301                         DURATION_INFINITE;
1302 }
1303 
1304 int
async_socket_get_port(const AsyncSocket * as)1305 async_socket_get_port(const AsyncSocket* as)
1306 {
1307     return sock_address_get_port(&as->address);
1308 }
1309 
1310 int
async_socket_is_connected(const AsyncSocket * as)1311 async_socket_is_connected(const AsyncSocket* as)
1312 {
1313     return as->fd >= 0;
1314 }
1315