• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * Encapsulates exchange protocol between the emulator, and an Android device
19  * that is connected to the host via USB. The communication is established over
20  * a TCP port forwarding, enabled by ADB.
21  */
22 
23 #include "android/async-socket-connector.h"
24 #include "android/async-socket.h"
25 #include "android/utils/debug.h"
26 #include "android/utils/eintr_wrapper.h"
27 #include "android/utils/panic.h"
28 #include "android/iolooper.h"
29 
30 #define  E(...)    derror(__VA_ARGS__)
31 #define  W(...)    dwarning(__VA_ARGS__)
32 #define  D(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
33 #define  D_ACTIVE  VERBOSE_CHECK(asyncsocket)
34 
35 #define TRACE_ON    0
36 
37 #if TRACE_ON
38 #define  T(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
39 #else
40 #define  T(...)
41 #endif
42 
43 /********************************************************************************
44  *                  Asynchronous Socket internal API declarations
45  *******************************************************************************/
46 
47 /* Gets socket's address string. */
48 static const char* _async_socket_string(AsyncSocket* as);
49 
50 /* Gets socket's looper. */
51 static Looper* _async_socket_get_looper(AsyncSocket* as);
52 
53 /* Handler for the I/O time out.
54  * Param:
55  *  as - Asynchronous socket for the I/O.
56  *  asio - Desciptor for the timed out I/O.
57  */
58 static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
59                                                 AsyncSocketIO* asio);
60 
61 /********************************************************************************
62  *                  Asynchronous Socket Reader / Writer
63  *******************************************************************************/
64 
65 struct AsyncSocketIO {
66     /* Next I/O in the reader, or writer list. */
67     AsyncSocketIO*      next;
68     /* Asynchronous socket for this I/O. */
69     AsyncSocket*        as;
70     /* Timer used for time outs on this I/O. */
71     LoopTimer           timer[1];
72     /* An opaque pointer associated with this I/O. */
73     void*               io_opaque;
74     /* Buffer where to read / write data. */
75     uint8_t*            buffer;
76     /* Bytes to transfer through the socket for this I/O. */
77     uint32_t            to_transfer;
78     /* Bytes thransferred through the socket in this I/O. */
79     uint32_t            transferred;
80     /* I/O callback for this I/O. */
81     on_as_io_cb         on_io;
82     /* I/O type selector: 1 - read, 0 - write. */
83     int                 is_io_read;
84     /* State of the I/O. */
85     AsyncIOState        state;
86     /* Number of outstanding references to the I/O. */
87     int                 ref_count;
88     /* Deadline for this I/O */
89     Duration            deadline;
90 };
91 
92 /*
93  * Recycling I/O instances.
94  * Since AsyncSocketIO instances are not that large, it makes sence to recycle
95  * them for faster allocation, rather than allocating and freeing them for each
96  * I/O on the socket.
97  */
98 
99 /* List of recycled I/O descriptors. */
100 static AsyncSocketIO* _asio_recycled    = NULL;
101 /* Number of I/O descriptors that are recycled in the _asio_recycled list. */
102 static int _recycled_asio_count         = 0;
103 /* Maximum number of I/O descriptors that can be recycled. */
104 static const int _max_recycled_asio_num = 32;
105 
106 /* Handler for an I/O time-out timer event.
107  * When this routine is invoked, it indicates that a time out has occurred on an
108  * I/O.
109  * Param:
110  *  opaque - AsyncSocketIO instance representing the timed out I/O.
111  */
112 static void _on_async_socket_io_timed_out(void* opaque);
113 
114 /* Creates new I/O descriptor.
115  * Param:
116  *  as - Asynchronous socket for the I/O.
117  *  is_io_read - I/O type selector: 1 - read, 0 - write.
118  *  buffer, len - Reader / writer buffer address.
119  *  io_cb - Callback for this reader / writer.
120  *  io_opaque - An opaque pointer associated with the I/O.
121  *  deadline - Deadline to complete the I/O.
122  * Return:
123  *  Initialized AsyncSocketIO instance.
124  */
125 static AsyncSocketIO*
_async_socket_rw_new(AsyncSocket * as,int is_io_read,void * buffer,uint32_t len,on_as_io_cb io_cb,void * io_opaque,Duration deadline)126 _async_socket_rw_new(AsyncSocket* as,
127                      int is_io_read,
128                      void* buffer,
129                      uint32_t len,
130                      on_as_io_cb io_cb,
131                      void* io_opaque,
132                      Duration deadline)
133 {
134     /* Lookup in the recycler first. */
135     AsyncSocketIO* asio = _asio_recycled;
136     if (asio != NULL) {
137         /* Pull the descriptor from recycler. */
138         _asio_recycled = asio->next;
139         _recycled_asio_count--;
140     } else {
141         /* No recycled descriptors. Allocate new one. */
142         ANEW0(asio);
143     }
144 
145     asio->next          = NULL;
146     asio->as            = as;
147     asio->is_io_read    = is_io_read;
148     asio->buffer        = (uint8_t*)buffer;
149     asio->to_transfer   = len;
150     asio->transferred   = 0;
151     asio->on_io         = io_cb;
152     asio->io_opaque     = io_opaque;
153     asio->state         = ASIO_STATE_QUEUED;
154     asio->ref_count     = 1;
155     asio->deadline      = deadline;
156     loopTimer_init(asio->timer, _async_socket_get_looper(as),
157                    _on_async_socket_io_timed_out, asio);
158     loopTimer_startAbsolute(asio->timer, deadline);
159 
160     /* Reference socket that is holding this I/O. */
161     async_socket_reference(as);
162 
163     T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data",
164       _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len);
165 
166     return asio;
167 }
168 
169 /* Destroys and frees I/O descriptor. */
170 static void
_async_socket_io_free(AsyncSocketIO * asio)171 _async_socket_io_free(AsyncSocketIO* asio)
172 {
173     AsyncSocket* const as = asio->as;
174 
175     T("ASocket %s: %s I/O descriptor %p is destroyed.",
176       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
177 
178     loopTimer_done(asio->timer);
179 
180     /* Try to recycle it first, and free the memory if recycler is full. */
181     if (_recycled_asio_count < _max_recycled_asio_num) {
182         asio->next = _asio_recycled;
183         _asio_recycled = asio;
184         _recycled_asio_count++;
185     } else {
186         AFREE(asio);
187     }
188 
189     /* Release socket that is holding this I/O. */
190     async_socket_release(as);
191 }
192 
193 /* An I/O has been finished and its descriptor is about to be discarded. */
194 static void
_async_socket_io_finished(AsyncSocketIO * asio)195 _async_socket_io_finished(AsyncSocketIO* asio)
196 {
197     /* Notify the client of the I/O that I/O is finished. */
198     asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED);
199 }
200 
201 int
async_socket_io_reference(AsyncSocketIO * asio)202 async_socket_io_reference(AsyncSocketIO* asio)
203 {
204     assert(asio->ref_count > 0);
205     asio->ref_count++;
206     return asio->ref_count;
207 }
208 
209 int
async_socket_io_release(AsyncSocketIO * asio)210 async_socket_io_release(AsyncSocketIO* asio)
211 {
212     assert(asio->ref_count > 0);
213     asio->ref_count--;
214     if (asio->ref_count == 0) {
215         _async_socket_io_finished(asio);
216         /* Last reference has been dropped. Destroy this object. */
217         _async_socket_io_free(asio);
218         return 0;
219     }
220     return asio->ref_count;
221 }
222 
223 /* Creates new asynchronous socket reader.
224  * Param:
225  *  as - Asynchronous socket for the reader.
226  *  buffer, len - Reader's buffer.
227  *  io_cb - Reader's callback.
228  *  reader_opaque - An opaque pointer associated with the reader.
229  *  deadline - Deadline to complete the operation.
230  * Return:
231  *  An initialized AsyncSocketIO intance.
232  */
233 static AsyncSocketIO*
_async_socket_reader_new(AsyncSocket * as,void * buffer,uint32_t len,on_as_io_cb io_cb,void * reader_opaque,Duration deadline)234 _async_socket_reader_new(AsyncSocket* as,
235                          void* buffer,
236                          uint32_t len,
237                          on_as_io_cb io_cb,
238                          void* reader_opaque,
239                          Duration deadline)
240 {
241     AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
242                                                      reader_opaque, deadline);
243     return asio;
244 }
245 
246 /* Creates new asynchronous socket writer.
247  * Param:
248  *  as - Asynchronous socket for the writer.
249  *  buffer, len - Writer's buffer.
250  *  io_cb - Writer's callback.
251  *  writer_opaque - An opaque pointer associated with the writer.
252  *  deadline - Deadline to complete the operation.
253  * Return:
254  *  An initialized AsyncSocketIO intance.
255  */
256 static AsyncSocketIO*
_async_socket_writer_new(AsyncSocket * as,const void * buffer,uint32_t len,on_as_io_cb io_cb,void * writer_opaque,Duration deadline)257 _async_socket_writer_new(AsyncSocket* as,
258                          const void* buffer,
259                          uint32_t len,
260                          on_as_io_cb io_cb,
261                          void* writer_opaque,
262                          Duration deadline)
263 {
264     AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
265                                                      io_cb, writer_opaque,
266                                                      deadline);
267     return asio;
268 }
269 
270 /* I/O timed out. */
271 static void
_on_async_socket_io_timed_out(void * opaque)272 _on_async_socket_io_timed_out(void* opaque)
273 {
274     AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
275     AsyncSocket* const as = asio->as;
276 
277     D("ASocket %s: %s I/O with deadline %lld has timed out at %lld",
278       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
279       asio->deadline, async_socket_deadline(as, 0));
280 
281     /* Reference while in callback. */
282     async_socket_io_reference(asio);
283     _async_socket_io_timed_out(asio->as, asio);
284     async_socket_io_release(asio);
285 }
286 
287 /********************************************************************************
288  *                 Public Asynchronous Socket I/O API
289  *******************************************************************************/
290 
291 AsyncSocket*
async_socket_io_get_socket(const AsyncSocketIO * asio)292 async_socket_io_get_socket(const AsyncSocketIO* asio)
293 {
294     async_socket_reference(asio->as);
295     return asio->as;
296 }
297 
298 void
async_socket_io_cancel_time_out(AsyncSocketIO * asio)299 async_socket_io_cancel_time_out(AsyncSocketIO* asio)
300 {
301     loopTimer_stop(asio->timer);
302 }
303 
304 void*
async_socket_io_get_io_opaque(const AsyncSocketIO * asio)305 async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
306 {
307     return asio->io_opaque;
308 }
309 
310 void*
async_socket_io_get_client_opaque(const AsyncSocketIO * asio)311 async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
312 {
313     return async_socket_get_client_opaque(asio->as);
314 }
315 
316 void*
async_socket_io_get_buffer_info(const AsyncSocketIO * asio,uint32_t * transferred,uint32_t * to_transfer)317 async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
318                                 uint32_t* transferred,
319                                 uint32_t* to_transfer)
320 {
321     if (transferred != NULL) {
322         *transferred = asio->transferred;
323     }
324     if (to_transfer != NULL) {
325         *to_transfer = asio->to_transfer;
326     }
327     return asio->buffer;
328 }
329 
330 void*
async_socket_io_get_buffer(const AsyncSocketIO * asio)331 async_socket_io_get_buffer(const AsyncSocketIO* asio)
332 {
333     return asio->buffer;
334 }
335 
336 uint32_t
async_socket_io_get_transferred(const AsyncSocketIO * asio)337 async_socket_io_get_transferred(const AsyncSocketIO* asio)
338 {
339     return asio->transferred;
340 }
341 
342 uint32_t
async_socket_io_get_to_transfer(const AsyncSocketIO * asio)343 async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
344 {
345     return asio->to_transfer;
346 }
347 
348 int
async_socket_io_is_read(const AsyncSocketIO * asio)349 async_socket_io_is_read(const AsyncSocketIO* asio)
350 {
351     return asio->is_io_read;
352 }
353 
354 /********************************************************************************
355  *                      Asynchronous Socket internals
356  *******************************************************************************/
357 
358 struct AsyncSocket {
359     /* TCP address for the socket. */
360     SockAddress         address;
361     /* Connection callback for this socket. */
362     on_as_connection_cb on_connection;
363     /* An opaque pointer associated with this socket by the client. */
364     void*               client_opaque;
365     /* I/O looper for asynchronous I/O on the socket. */
366     Looper*             looper;
367     /* I/O descriptor for asynchronous I/O on the socket. */
368     LoopIo              io[1];
369     /* Timer to use for reconnection attempts. */
370     LoopTimer           reconnect_timer[1];
371     /* Head of the list of the active readers. */
372     AsyncSocketIO*      readers_head;
373     /* Tail of the list of the active readers. */
374     AsyncSocketIO*      readers_tail;
375     /* Head of the list of the active writers. */
376     AsyncSocketIO*      writers_head;
377     /* Tail of the list of the active writers. */
378     AsyncSocketIO*      writers_tail;
379     /* Socket's file descriptor. */
380     int                 fd;
381     /* Timeout to use for reconnection attempts. */
382     int                 reconnect_to;
383     /* Number of outstanding references to the socket. */
384     int                 ref_count;
385     /* Flags whether (1) or not (0) socket owns the looper. */
386     int                 owns_looper;
387 };
388 
389 static const char*
_async_socket_string(AsyncSocket * as)390 _async_socket_string(AsyncSocket* as)
391 {
392     return sock_address_to_string(&as->address);
393 }
394 
395 static Looper*
_async_socket_get_looper(AsyncSocket * as)396 _async_socket_get_looper(AsyncSocket* as)
397 {
398     return as->looper;
399 }
400 
401 /* Pulls first reader out of the list.
402  * Param:
403  *  as - Initialized AsyncSocket instance.
404  * Return:
405  *  First I/O pulled out of the list, or NULL if there are no I/O in the list.
406  *  Note that the caller is responsible for releasing the I/O object returned
407  *  from this routine.
408  */
409 static AsyncSocketIO*
_async_socket_pull_first_io(AsyncSocket * as,AsyncSocketIO ** list_head,AsyncSocketIO ** list_tail)410 _async_socket_pull_first_io(AsyncSocket* as,
411                             AsyncSocketIO** list_head,
412                             AsyncSocketIO** list_tail)
413 {
414     AsyncSocketIO* const ret = *list_head;
415     if (ret != NULL) {
416         *list_head = ret->next;
417         ret->next = NULL;
418         if (*list_head == NULL) {
419             *list_tail = NULL;
420         }
421     }
422     return ret;
423 }
424 
425 /* Pulls first reader out of the list.
426  * Param:
427  *  as - Initialized AsyncSocket instance.
428  * Return:
429  *  First reader pulled out of the list, or NULL if there are no readers in the
430  *  list.
431  *  Note that the caller is responsible for releasing the I/O object returned
432  *  from this routine.
433  */
434 static AsyncSocketIO*
_async_socket_pull_first_reader(AsyncSocket * as)435 _async_socket_pull_first_reader(AsyncSocket* as)
436 {
437     return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
438 }
439 
440 /* Pulls first writer out of the list.
441  * Param:
442  *  as - Initialized AsyncSocket instance.
443  * Return:
444  *  First writer pulled out of the list, or NULL if there are no writers in the
445  *  list.
446  *  Note that the caller is responsible for releasing the I/O object returned
447  *  from this routine.
448  */
449 static AsyncSocketIO*
_async_socket_pull_first_writer(AsyncSocket * as)450 _async_socket_pull_first_writer(AsyncSocket* as)
451 {
452     return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
453 }
454 
455 /* Removes an I/O descriptor from a list of active I/O.
456  * Param:
457  *  as - Initialized AsyncSocket instance.
458  *  list_head, list_tail - Pointers to the list head and tail.
459  *  io - I/O to remove.
460  * Return:
461  *  Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
462  */
463 static int
_async_socket_remove_io(AsyncSocket * as,AsyncSocketIO ** list_head,AsyncSocketIO ** list_tail,AsyncSocketIO * io)464 _async_socket_remove_io(AsyncSocket* as,
465                         AsyncSocketIO** list_head,
466                         AsyncSocketIO** list_tail,
467                         AsyncSocketIO* io)
468 {
469     AsyncSocketIO* prev = NULL;
470 
471     while (*list_head != NULL && io != *list_head) {
472         prev = *list_head;
473         list_head = &((*list_head)->next);
474     }
475     if (*list_head == NULL) {
476         D("%s: I/O %p is not found in the list for socket '%s'",
477           __FUNCTION__, io, _async_socket_string(as));
478         return 0;
479     }
480 
481     *list_head = io->next;
482     if (prev != NULL) {
483         prev->next = io->next;
484     }
485     if (*list_tail == io) {
486         *list_tail = prev;
487     }
488 
489     /* Release I/O adjusting reference added when I/O has been saved in the list. */
490     async_socket_io_release(io);
491 
492     return 1;
493 }
494 
495 /* Advances to the next I/O in the list.
496  * Param:
497  *  as - Initialized AsyncSocket instance.
498  *  list_head, list_tail - Pointers to the list head and tail.
499  */
500 static void
_async_socket_advance_io(AsyncSocket * as,AsyncSocketIO ** list_head,AsyncSocketIO ** list_tail)501 _async_socket_advance_io(AsyncSocket* as,
502                          AsyncSocketIO** list_head,
503                          AsyncSocketIO** list_tail)
504 {
505     AsyncSocketIO* first_io = *list_head;
506     if (first_io != NULL) {
507         *list_head = first_io->next;
508         first_io->next = NULL;
509     }
510     if (*list_head == NULL) {
511         *list_tail = NULL;
512     }
513     if (first_io != NULL) {
514         /* Release I/O removed from the head of the list. */
515         async_socket_io_release(first_io);
516     }
517 }
518 
519 /* Advances to the next reader in the list.
520  * Param:
521  *  as - Initialized AsyncSocket instance.
522  */
523 static void
_async_socket_advance_reader(AsyncSocket * as)524 _async_socket_advance_reader(AsyncSocket* as)
525 {
526     _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
527 }
528 
529 /* Advances to the next writer in the list.
530  * Param:
531  *  as - Initialized AsyncSocket instance.
532  */
533 static void
_async_socket_advance_writer(AsyncSocket * as)534 _async_socket_advance_writer(AsyncSocket* as)
535 {
536     _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
537 }
538 
539 /* Completes an I/O.
540  * Param:
541  *  as - Initialized AsyncSocket instance.
542  *  asio - I/O to complete.
543  * Return:
544  *  One of AsyncIOAction values.
545  */
546 static AsyncIOAction
_async_socket_complete_io(AsyncSocket * as,AsyncSocketIO * asio)547 _async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
548 {
549     T("ASocket %s: %s I/O %p is completed.",
550       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
551 
552     /* Stop the timer. */
553     async_socket_io_cancel_time_out(asio);
554 
555     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
556 }
557 
558 /* Timeouts an I/O.
559  * Param:
560  *  as - Initialized AsyncSocket instance.
561  *  asio - An I/O that has timed out.
562  * Return:
563  *  One of AsyncIOAction values.
564  */
565 static AsyncIOAction
_async_socket_io_timed_out(AsyncSocket * as,AsyncSocketIO * asio)566 _async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
567 {
568     T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld",
569       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
570       asio->deadline, async_socket_deadline(as, 0));
571 
572     /* Report to the client. */
573     const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
574                                              ASIO_STATE_TIMED_OUT);
575 
576     /* Remove the I/O from a list of active I/O for actions other than retry. */
577     if (action != ASIO_ACTION_RETRY) {
578         if (asio->is_io_read) {
579             _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
580         } else {
581             _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
582         }
583     }
584 
585     return action;
586 }
587 
588 /* Cancels an I/O.
589  * Param:
590  *  as - Initialized AsyncSocket instance.
591  *  asio - An I/O to cancel.
592  * Return:
593  *  One of AsyncIOAction values.
594  */
595 static AsyncIOAction
_async_socket_cancel_io(AsyncSocket * as,AsyncSocketIO * asio)596 _async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
597 {
598     T("ASocket %s: %s I/O %p is cancelled.",
599       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
600 
601     /* Stop the timer. */
602     async_socket_io_cancel_time_out(asio);
603 
604     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
605 }
606 
607 /* Reports an I/O failure.
608  * Param:
609  *  as - Initialized AsyncSocket instance.
610  *  asio - An I/O that has failed. Can be NULL for general failures.
611  *  failure - Failure (errno) that has occurred.
612  * Return:
613  *  One of AsyncIOAction values.
614  */
615 static AsyncIOAction
_async_socket_io_failure(AsyncSocket * as,AsyncSocketIO * asio,int failure)616 _async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
617 {
618     T("ASocket %s: %s I/O %p has failed: %d -> %s",
619       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
620       failure, strerror(failure));
621 
622     /* Stop the timer. */
623     async_socket_io_cancel_time_out(asio);
624 
625     errno = failure;
626     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
627 }
628 
629 /* Cancels all the active socket readers.
630  * Param:
631  *  as - Initialized AsyncSocket instance.
632  */
633 static void
_async_socket_cancel_readers(AsyncSocket * as)634 _async_socket_cancel_readers(AsyncSocket* as)
635 {
636     while (as->readers_head != NULL) {
637         AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
638         /* We ignore action returned from the cancellation callback, since we're
639          * in a disconnected state here. */
640         _async_socket_cancel_io(as, to_cancel);
641         async_socket_io_release(to_cancel);
642     }
643 }
644 
645 /* Cancels all the active socket writers.
646  * Param:
647  *  as - Initialized AsyncSocket instance.
648  */
649 static void
_async_socket_cancel_writers(AsyncSocket * as)650 _async_socket_cancel_writers(AsyncSocket* as)
651 {
652     while (as->writers_head != NULL) {
653         AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
654         /* We ignore action returned from the cancellation callback, since we're
655          * in a disconnected state here. */
656         _async_socket_cancel_io(as, to_cancel);
657         async_socket_io_release(to_cancel);
658     }
659 }
660 
661 /* Cancels all the I/O on the socket. */
662 static void
_async_socket_cancel_all_io(AsyncSocket * as)663 _async_socket_cancel_all_io(AsyncSocket* as)
664 {
665     /* Stop the reconnection timer. */
666     loopTimer_stop(as->reconnect_timer);
667 
668     /* Stop read / write on the socket. */
669     loopIo_dontWantWrite(as->io);
670     loopIo_dontWantRead(as->io);
671 
672     /* Cancel active readers and writers. */
673     _async_socket_cancel_readers(as);
674     _async_socket_cancel_writers(as);
675 }
676 
677 /* Closes socket handle used by the async socket.
678  * Param:
679  *  as - Initialized AsyncSocket instance.
680  */
681 static void
_async_socket_close_socket(AsyncSocket * as)682 _async_socket_close_socket(AsyncSocket* as)
683 {
684     if (as->fd >= 0) {
685         T("ASocket %s: Socket handle %d is closed.",
686           _async_socket_string(as), as->fd);
687         loopIo_done(as->io);
688         socket_close(as->fd);
689         as->fd = -1;
690     }
691 }
692 
693 /* Destroys AsyncSocket instance.
694  * Param:
695  *  as - Initialized AsyncSocket instance.
696  */
697 static void
_async_socket_free(AsyncSocket * as)698 _async_socket_free(AsyncSocket* as)
699 {
700     if (as != NULL) {
701         T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as));
702 
703         /* Close socket. */
704         _async_socket_close_socket(as);
705 
706         /* Free allocated resources. */
707         if (as->looper != NULL) {
708             loopTimer_done(as->reconnect_timer);
709             if (as->owns_looper) {
710                 looper_free(as->looper);
711             }
712         }
713         sock_address_done(&as->address);
714         AFREE(as);
715     }
716 }
717 
718 /* Starts reconnection attempts after connection has been lost.
719  * Param:
720  *  as - Initialized AsyncSocket instance.
721  *  to - Milliseconds to wait before reconnection attempt.
722  */
723 static void
_async_socket_reconnect(AsyncSocket * as,int to)724 _async_socket_reconnect(AsyncSocket* as, int to)
725 {
726     T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to);
727 
728     /* Make sure that no I/O is active, and socket is closed before we
729      * reconnect. */
730     _async_socket_cancel_all_io(as);
731 
732     /* Set the timer for reconnection attempt. */
733     loopTimer_startRelative(as->reconnect_timer, to);
734 }
735 
736 /********************************************************************************
737  *                      Asynchronous Socket callbacks
738  *******************************************************************************/
739 
740 /* A callback that is invoked when socket gets disconnected.
741  * Param:
742  *  as - Initialized AsyncSocket instance.
743  */
744 static void
_on_async_socket_disconnected(AsyncSocket * as)745 _on_async_socket_disconnected(AsyncSocket* as)
746 {
747     /* Save error to restore it for the client's callback. */
748     const int save_errno = errno;
749     AsyncIOAction action = ASIO_ACTION_ABORT;
750 
751     D("ASocket %s: Disconnected.", _async_socket_string(as));
752 
753     /* Cancel all the I/O on this socket. */
754     _async_socket_cancel_all_io(as);
755 
756     /* Close the socket. */
757     _async_socket_close_socket(as);
758 
759     /* Restore errno, and invoke client's callback. */
760     errno = save_errno;
761     action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
762 
763     if (action == ASIO_ACTION_RETRY) {
764         /* Client requested reconnection. */
765         _async_socket_reconnect(as, as->reconnect_to);
766     }
767 }
768 
769 /* A callback that is invoked on socket's I/O failure.
770  * Param:
771  *  as - Initialized AsyncSocket instance.
772  *  asio - Descriptor for the failed I/O. Can be NULL for general failures.
773  */
774 static AsyncIOAction
_on_async_socket_failure(AsyncSocket * as,AsyncSocketIO * asio)775 _on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
776 {
777     D("ASocket %s: %s I/O failure: %d -> %s",
778       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
779       errno, strerror(errno));
780 
781     /* Report the failure. */
782     return _async_socket_io_failure(as, asio, errno);
783 }
784 
785 /* A callback that is invoked when there is data available to read.
786  * Param:
787  *  as - Initialized AsyncSocket instance.
788  * Return:
789  *  0 on success, or -1 on failure. Failure returned from this routine will
790  *  skip writes (if awailable) behind this read.
791  */
792 static int
_on_async_socket_recv(AsyncSocket * as)793 _on_async_socket_recv(AsyncSocket* as)
794 {
795     AsyncIOAction action;
796 
797     /* Get current reader. */
798     AsyncSocketIO* const asr = as->readers_head;
799     if (asr == NULL) {
800         D("ASocket %s: No reader is available.", _async_socket_string(as));
801         loopIo_dontWantRead(as->io);
802         return 0;
803     }
804 
805     /* Reference the reader while we're working with it in this callback. */
806     async_socket_io_reference(asr);
807 
808     /* Bump I/O state, and inform the client that I/O is in progress. */
809     if (asr->state == ASIO_STATE_QUEUED) {
810         asr->state = ASIO_STATE_STARTED;
811     } else {
812         asr->state = ASIO_STATE_CONTINUES;
813     }
814     action = asr->on_io(asr->io_opaque, asr, asr->state);
815     if (action == ASIO_ACTION_ABORT) {
816         D("ASocket %s: Read is aborted by the client.", _async_socket_string(as));
817         /* Move on to the next reader. */
818         _async_socket_advance_reader(as);
819         /* Lets see if there are still active readers, and enable, or disable
820          * read I/O callback accordingly. */
821         if (as->readers_head != NULL) {
822             loopIo_wantRead(as->io);
823         } else {
824             loopIo_dontWantRead(as->io);
825         }
826         async_socket_io_release(asr);
827         return 0;
828     }
829 
830     /* Read next chunk of data. */
831     int res = HANDLE_EINTR(
832             socket_recv(as->fd,
833                         asr->buffer + asr->transferred,
834                         asr->to_transfer - asr->transferred));
835     if (res == 0) {
836         /* Socket has been disconnected. */
837         errno = ECONNRESET;
838         _on_async_socket_disconnected(as);
839         async_socket_io_release(asr);
840         return -1;
841     }
842 
843     if (res < 0) {
844         if (errno == EWOULDBLOCK || errno == EAGAIN) {
845             /* Yield to writes behind this read. */
846             loopIo_wantRead(as->io);
847             async_socket_io_release(asr);
848             return 0;
849         }
850 
851         /* An I/O error. */
852         action = _on_async_socket_failure(as, asr);
853         if (action != ASIO_ACTION_RETRY) {
854             D("ASocket %s: Read is aborted on failure.", _async_socket_string(as));
855             /* Move on to the next reader. */
856             _async_socket_advance_reader(as);
857             /* Lets see if there are still active readers, and enable, or disable
858              * read I/O callback accordingly. */
859             if (as->readers_head != NULL) {
860                 loopIo_wantRead(as->io);
861             } else {
862                 loopIo_dontWantRead(as->io);
863             }
864         }
865         async_socket_io_release(asr);
866         return -1;
867     }
868 
869     /* Update the reader's descriptor. */
870     asr->transferred += res;
871     if (asr->transferred == asr->to_transfer) {
872         /* This read is completed. Move on to the next reader. */
873         _async_socket_advance_reader(as);
874 
875         /* Notify reader completion. */
876         _async_socket_complete_io(as, asr);
877     }
878 
879     /* Lets see if there are still active readers, and enable, or disable read
880      * I/O callback accordingly. */
881     if (as->readers_head != NULL) {
882         loopIo_wantRead(as->io);
883     } else {
884         loopIo_dontWantRead(as->io);
885     }
886 
887     async_socket_io_release(asr);
888 
889     return 0;
890 }
891 
892 /* A callback that is invoked when there is data available to write.
893  * Param:
894  *  as - Initialized AsyncSocket instance.
895  * Return:
896  *  0 on success, or -1 on failure. Failure returned from this routine will
897  *  skip reads (if awailable) behind this write.
898  */
899 static int
_on_async_socket_send(AsyncSocket * as)900 _on_async_socket_send(AsyncSocket* as)
901 {
902     AsyncIOAction action;
903 
904     /* Get current writer. */
905     AsyncSocketIO* const asw = as->writers_head;
906     if (asw == NULL) {
907         D("ASocket %s: No writer is available.", _async_socket_string(as));
908         loopIo_dontWantWrite(as->io);
909         return 0;
910     }
911 
912     /* Reference the writer while we're working with it in this callback. */
913     async_socket_io_reference(asw);
914 
915     /* Bump I/O state, and inform the client that I/O is in progress. */
916     if (asw->state == ASIO_STATE_QUEUED) {
917         asw->state = ASIO_STATE_STARTED;
918     } else {
919         asw->state = ASIO_STATE_CONTINUES;
920     }
921     action = asw->on_io(asw->io_opaque, asw, asw->state);
922     if (action == ASIO_ACTION_ABORT) {
923         D("ASocket %s: Write is aborted by the client.", _async_socket_string(as));
924         /* Move on to the next writer. */
925         _async_socket_advance_writer(as);
926         /* Lets see if there are still active writers, and enable, or disable
927          * write I/O callback accordingly. */
928         if (as->writers_head != NULL) {
929             loopIo_wantWrite(as->io);
930         } else {
931             loopIo_dontWantWrite(as->io);
932         }
933         async_socket_io_release(asw);
934         return 0;
935     }
936 
937     /* Write next chunk of data. */
938     int res = HANDLE_EINTR(
939             socket_send(as->fd,
940                         asw->buffer + asw->transferred,
941                         asw->to_transfer - asw->transferred));
942     if (res == 0) {
943         /* Socket has been disconnected. */
944         errno = ECONNRESET;
945         _on_async_socket_disconnected(as);
946         async_socket_io_release(asw);
947         return -1;
948     }
949 
950     if (res < 0) {
951         if (errno == EWOULDBLOCK || errno == EAGAIN) {
952             /* Yield to reads behind this write. */
953             loopIo_wantWrite(as->io);
954             async_socket_io_release(asw);
955             return 0;
956         }
957 
958         /* An I/O error. */
959         action = _on_async_socket_failure(as, asw);
960         if (action != ASIO_ACTION_RETRY) {
961             D("ASocket %s: Write is aborted on failure.", _async_socket_string(as));
962             /* Move on to the next writer. */
963             _async_socket_advance_writer(as);
964             /* Lets see if there are still active writers, and enable, or disable
965              * write I/O callback accordingly. */
966             if (as->writers_head != NULL) {
967                 loopIo_wantWrite(as->io);
968             } else {
969                 loopIo_dontWantWrite(as->io);
970             }
971         }
972         async_socket_io_release(asw);
973         return -1;
974     }
975 
976     /* Update the writer descriptor. */
977     asw->transferred += res;
978     if (asw->transferred == asw->to_transfer) {
979         /* This write is completed. Move on to the next writer. */
980         _async_socket_advance_writer(as);
981 
982         /* Notify writer completion. */
983         _async_socket_complete_io(as, asw);
984     }
985 
986     /* Lets see if there are still active writers, and enable, or disable write
987      * I/O callback accordingly. */
988     if (as->writers_head != NULL) {
989         loopIo_wantWrite(as->io);
990     } else {
991         loopIo_dontWantWrite(as->io);
992     }
993 
994     async_socket_io_release(asw);
995 
996     return 0;
997 }
998 
999 /* A callback that is invoked when an I/O is available on socket.
1000  * Param:
1001  *  as - Initialized AsyncSocket instance.
1002  *  fd - Socket's file descriptor.
1003  *  events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
1004  */
1005 static void
_on_async_socket_io(void * opaque,int fd,unsigned events)1006 _on_async_socket_io(void* opaque, int fd, unsigned events)
1007 {
1008     AsyncSocket* const as = (AsyncSocket*)opaque;
1009 
1010     /* Reference the socket while we're working with it in this callback. */
1011     async_socket_reference(as);
1012 
1013     if ((events & LOOP_IO_READ) != 0) {
1014         if (_on_async_socket_recv(as) != 0) {
1015             async_socket_release(as);
1016             return;
1017         }
1018     }
1019 
1020     if ((events & LOOP_IO_WRITE) != 0) {
1021         if (_on_async_socket_send(as) != 0) {
1022             async_socket_release(as);
1023             return;
1024         }
1025     }
1026 
1027     async_socket_release(as);
1028 }
1029 
1030 /* A callback that is invoked by asynchronous socket connector on connection
1031  *  events.
1032  * Param:
1033  *  opaque - Initialized AsyncSocket instance.
1034  *  connector - Connector that is used to connect this socket.
1035  *  event - Connection event.
1036  * Return:
1037  *  One of AsyncIOAction values.
1038  */
1039 static AsyncIOAction
_on_connector_events(void * opaque,AsyncSocketConnector * connector,AsyncIOState event)1040 _on_connector_events(void* opaque,
1041                      AsyncSocketConnector* connector,
1042                      AsyncIOState event)
1043 {
1044     AsyncIOAction action;
1045     AsyncSocket* const as = (AsyncSocket*)opaque;
1046 
1047     /* Reference the socket while we're working with it in this callback. */
1048     async_socket_reference(as);
1049 
1050     if (event == ASIO_STATE_SUCCEEDED) {
1051         /* Accept the connection. */
1052         as->fd = async_socket_connector_pull_fd(connector);
1053         loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
1054     }
1055 
1056     /* Invoke client's callback. */
1057     action = as->on_connection(as->client_opaque, as, event);
1058     if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
1059         /* For whatever reason the client didn't want to keep this connection.
1060          * Close it. */
1061         D("ASocket %s: Connection is discarded by the client.",
1062           _async_socket_string(as));
1063         _async_socket_close_socket(as);
1064     }
1065 
1066     if (action != ASIO_ACTION_RETRY) {
1067         async_socket_connector_release(connector);
1068     }
1069 
1070     async_socket_release(as);
1071 
1072     return action;
1073 }
1074 
1075 /* Timer callback invoked to reconnect the lost connection.
1076  * Param:
1077  *  as - Initialized AsyncSocket instance.
1078  */
1079 void
_on_async_socket_reconnect(void * opaque)1080 _on_async_socket_reconnect(void* opaque)
1081 {
1082     AsyncSocket* as = (AsyncSocket*)opaque;
1083 
1084     /* Reference the socket while we're working with it in this callback. */
1085     async_socket_reference(as);
1086     async_socket_connect(as, as->reconnect_to);
1087     async_socket_release(as);
1088 }
1089 
1090 
1091 /********************************************************************************
1092  *                  Android Device Socket public API
1093  *******************************************************************************/
1094 
1095 AsyncSocket*
async_socket_new(int port,int reconnect_to,on_as_connection_cb client_cb,void * client_opaque,Looper * looper)1096 async_socket_new(int port,
1097                  int reconnect_to,
1098                  on_as_connection_cb client_cb,
1099                  void* client_opaque,
1100                  Looper* looper)
1101 {
1102     AsyncSocket* as;
1103 
1104     if (client_cb == NULL) {
1105         E("Invalid client_cb parameter");
1106         return NULL;
1107     }
1108 
1109     ANEW0(as);
1110 
1111     as->fd = -1;
1112     as->client_opaque = client_opaque;
1113     as->on_connection = client_cb;
1114     as->readers_head = as->readers_tail = NULL;
1115     as->reconnect_to = reconnect_to;
1116     as->ref_count = 1;
1117     sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
1118     if (looper == NULL) {
1119         as->looper = looper_newCore();
1120         if (as->looper == NULL) {
1121             E("Unable to create I/O looper for async socket '%s'",
1122               _async_socket_string(as));
1123             client_cb(client_opaque, as, ASIO_STATE_FAILED);
1124             _async_socket_free(as);
1125             return NULL;
1126         }
1127         as->owns_looper = 1;
1128     } else {
1129         as->looper = looper;
1130         as->owns_looper = 0;
1131     }
1132 
1133     loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
1134 
1135     T("ASocket %s: Descriptor is created.", _async_socket_string(as));
1136 
1137     return as;
1138 }
1139 
1140 int
async_socket_reference(AsyncSocket * as)1141 async_socket_reference(AsyncSocket* as)
1142 {
1143     assert(as->ref_count > 0);
1144     as->ref_count++;
1145     return as->ref_count;
1146 }
1147 
1148 int
async_socket_release(AsyncSocket * as)1149 async_socket_release(AsyncSocket* as)
1150 {
1151     assert(as->ref_count > 0);
1152     as->ref_count--;
1153     if (as->ref_count == 0) {
1154         /* Last reference has been dropped. Destroy this object. */
1155         _async_socket_cancel_all_io(as);
1156         _async_socket_free(as);
1157         return 0;
1158     }
1159     return as->ref_count;
1160 }
1161 
1162 void
async_socket_connect(AsyncSocket * as,int retry_to)1163 async_socket_connect(AsyncSocket* as, int retry_to)
1164 {
1165     T("ASocket %s: Handling connection request for %dms...",
1166       _async_socket_string(as), retry_to);
1167 
1168     AsyncSocketConnector* const connector =
1169         async_socket_connector_new(&as->address, retry_to, _on_connector_events,
1170                                    as, as->looper);
1171     if (connector != NULL) {
1172         async_socket_connector_connect(connector);
1173     } else {
1174         as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
1175     }
1176 }
1177 
1178 void
async_socket_disconnect(AsyncSocket * as)1179 async_socket_disconnect(AsyncSocket* as)
1180 {
1181     T("ASocket %s: Handling disconnection request...", _async_socket_string(as));
1182 
1183     if (as != NULL) {
1184         _async_socket_cancel_all_io(as);
1185         _async_socket_close_socket(as);
1186     }
1187 }
1188 
1189 void
async_socket_reconnect(AsyncSocket * as,int retry_to)1190 async_socket_reconnect(AsyncSocket* as, int retry_to)
1191 {
1192     T("ASocket %s: Handling reconnection request for %dms...",
1193       _async_socket_string(as), retry_to);
1194 
1195     _async_socket_cancel_all_io(as);
1196     _async_socket_close_socket(as);
1197     _async_socket_reconnect(as, retry_to);
1198 }
1199 
1200 void
async_socket_read_abs(AsyncSocket * as,void * buffer,uint32_t len,on_as_io_cb reader_cb,void * reader_opaque,Duration deadline)1201 async_socket_read_abs(AsyncSocket* as,
1202                       void* buffer, uint32_t len,
1203                       on_as_io_cb reader_cb,
1204                       void* reader_opaque,
1205                       Duration deadline)
1206 {
1207     T("ASocket %s: Handling read for %d bytes with deadline %lld...",
1208       _async_socket_string(as), len, deadline);
1209 
1210     AsyncSocketIO* const asr =
1211         _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
1212                                  deadline);
1213     if (async_socket_is_connected(as)) {
1214         /* Add new reader to the list. Note that we use initial reference from I/O
1215          * 'new' routine as "in the list" reference counter. */
1216         if (as->readers_head == NULL) {
1217             as->readers_head = as->readers_tail = asr;
1218         } else {
1219             as->readers_tail->next = asr;
1220             as->readers_tail = asr;
1221         }
1222         loopIo_wantRead(as->io);
1223     } else {
1224         D("ASocket %s: Read on a disconnected socket.", _async_socket_string(as));
1225         errno = ECONNRESET;
1226         reader_cb(reader_opaque, asr, ASIO_STATE_FAILED);
1227         async_socket_io_release(asr);
1228     }
1229 }
1230 
1231 void
async_socket_read_rel(AsyncSocket * as,void * buffer,uint32_t len,on_as_io_cb reader_cb,void * reader_opaque,int to)1232 async_socket_read_rel(AsyncSocket* as,
1233                       void* buffer, uint32_t len,
1234                       on_as_io_cb reader_cb,
1235                       void* reader_opaque,
1236                       int to)
1237 {
1238     const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1239                                     DURATION_INFINITE;
1240     async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
1241 }
1242 
1243 void
async_socket_write_abs(AsyncSocket * as,const void * buffer,uint32_t len,on_as_io_cb writer_cb,void * writer_opaque,Duration deadline)1244 async_socket_write_abs(AsyncSocket* as,
1245                        const void* buffer, uint32_t len,
1246                        on_as_io_cb writer_cb,
1247                        void* writer_opaque,
1248                        Duration deadline)
1249 {
1250     T("ASocket %s: Handling write for %d bytes with deadline %lld...",
1251       _async_socket_string(as), len, deadline);
1252 
1253     AsyncSocketIO* const asw =
1254         _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
1255                                  deadline);
1256     if (async_socket_is_connected(as)) {
1257         /* Add new writer to the list. Note that we use initial reference from I/O
1258          * 'new' routine as "in the list" reference counter. */
1259         if (as->writers_head == NULL) {
1260             as->writers_head = as->writers_tail = asw;
1261         } else {
1262             as->writers_tail->next = asw;
1263             as->writers_tail = asw;
1264         }
1265         loopIo_wantWrite(as->io);
1266     } else {
1267         D("ASocket %s: Write on a disconnected socket.", _async_socket_string(as));
1268         errno = ECONNRESET;
1269         writer_cb(writer_opaque, asw, ASIO_STATE_FAILED);
1270         async_socket_io_release(asw);
1271     }
1272 }
1273 
1274 void
async_socket_write_rel(AsyncSocket * as,const void * buffer,uint32_t len,on_as_io_cb writer_cb,void * writer_opaque,int to)1275 async_socket_write_rel(AsyncSocket* as,
1276                        const void* buffer, uint32_t len,
1277                        on_as_io_cb writer_cb,
1278                        void* writer_opaque,
1279                        int to)
1280 {
1281     const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1282                                     DURATION_INFINITE;
1283     async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
1284 }
1285 
1286 void*
async_socket_get_client_opaque(const AsyncSocket * as)1287 async_socket_get_client_opaque(const AsyncSocket* as)
1288 {
1289     return as->client_opaque;
1290 }
1291 
1292 Duration
async_socket_deadline(AsyncSocket * as,int rel)1293 async_socket_deadline(AsyncSocket* as, int rel)
1294 {
1295     return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
1296                         DURATION_INFINITE;
1297 }
1298 
1299 int
async_socket_get_port(const AsyncSocket * as)1300 async_socket_get_port(const AsyncSocket* as)
1301 {
1302     return sock_address_get_port(&as->address);
1303 }
1304 
1305 int
async_socket_is_connected(const AsyncSocket * as)1306 async_socket_is_connected(const AsyncSocket* as)
1307 {
1308     return as->fd >= 0;
1309 }
1310