• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * Encapsulates exchange protocol between the emulator, and an Android device
19  * that is connected to the host via USB. The communication is established over
20  * a TCP port forwarding, enabled by ADB.
21  */
22 
23 #include "qemu-common.h"
24 #include "android/async-utils.h"
25 #include "android/utils/debug.h"
26 #include "android/async-socket-connector.h"
27 #include "android/async-socket.h"
28 #include "utils/panic.h"
29 #include "iolooper.h"
30 
31 #define  E(...)    derror(__VA_ARGS__)
32 #define  W(...)    dwarning(__VA_ARGS__)
33 #define  D(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
34 #define  D_ACTIVE  VERBOSE_CHECK(asyncsocket)
35 
36 #define TRACE_ON    0
37 
38 #if TRACE_ON
39 #define  T(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
40 #else
41 #define  T(...)
42 #endif
43 
44 /********************************************************************************
45  *                  Asynchronous Socket internal API declarations
46  *******************************************************************************/
47 
48 /* Gets socket's address string. */
49 static const char* _async_socket_string(AsyncSocket* as);
50 
51 /* Gets socket's looper. */
52 static Looper* _async_socket_get_looper(AsyncSocket* as);
53 
54 /* Handler for the I/O time out.
55  * Param:
56  *  as - Asynchronous socket for the I/O.
57  *  asio - Desciptor for the timed out I/O.
58  */
59 static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
60                                                 AsyncSocketIO* asio);
61 
62 /********************************************************************************
63  *                  Asynchronous Socket Reader / Writer
64  *******************************************************************************/
65 
66 struct AsyncSocketIO {
67     /* Next I/O in the reader, or writer list. */
68     AsyncSocketIO*      next;
69     /* Asynchronous socket for this I/O. */
70     AsyncSocket*        as;
71     /* Timer used for time outs on this I/O. */
72     LoopTimer           timer[1];
73     /* An opaque pointer associated with this I/O. */
74     void*               io_opaque;
75     /* Buffer where to read / write data. */
76     uint8_t*            buffer;
77     /* Bytes to transfer through the socket for this I/O. */
78     uint32_t            to_transfer;
79     /* Bytes thransferred through the socket in this I/O. */
80     uint32_t            transferred;
81     /* I/O callback for this I/O. */
82     on_as_io_cb         on_io;
83     /* I/O type selector: 1 - read, 0 - write. */
84     int                 is_io_read;
85     /* State of the I/O. */
86     AsyncIOState        state;
87     /* Number of outstanding references to the I/O. */
88     int                 ref_count;
89     /* Deadline for this I/O */
90     Duration            deadline;
91 };
92 
93 /*
94  * Recycling I/O instances.
95  * Since AsyncSocketIO instances are not that large, it makes sence to recycle
96  * them for faster allocation, rather than allocating and freeing them for each
97  * I/O on the socket.
98  */
99 
100 /* List of recycled I/O descriptors. */
101 static AsyncSocketIO* _asio_recycled    = NULL;
102 /* Number of I/O descriptors that are recycled in the _asio_recycled list. */
103 static int _recycled_asio_count         = 0;
104 /* Maximum number of I/O descriptors that can be recycled. */
105 static const int _max_recycled_asio_num = 32;
106 
107 /* Handler for an I/O time-out timer event.
108  * When this routine is invoked, it indicates that a time out has occurred on an
109  * I/O.
110  * Param:
111  *  opaque - AsyncSocketIO instance representing the timed out I/O.
112  */
113 static void _on_async_socket_io_timed_out(void* opaque);
114 
115 /* Creates new I/O descriptor.
116  * Param:
117  *  as - Asynchronous socket for the I/O.
118  *  is_io_read - I/O type selector: 1 - read, 0 - write.
119  *  buffer, len - Reader / writer buffer address.
120  *  io_cb - Callback for this reader / writer.
121  *  io_opaque - An opaque pointer associated with the I/O.
122  *  deadline - Deadline to complete the I/O.
123  * Return:
124  *  Initialized AsyncSocketIO instance.
125  */
126 static AsyncSocketIO*
_async_socket_rw_new(AsyncSocket * as,int is_io_read,void * buffer,uint32_t len,on_as_io_cb io_cb,void * io_opaque,Duration deadline)127 _async_socket_rw_new(AsyncSocket* as,
128                      int is_io_read,
129                      void* buffer,
130                      uint32_t len,
131                      on_as_io_cb io_cb,
132                      void* io_opaque,
133                      Duration deadline)
134 {
135     /* Lookup in the recycler first. */
136     AsyncSocketIO* asio = _asio_recycled;
137     if (asio != NULL) {
138         /* Pull the descriptor from recycler. */
139         _asio_recycled = asio->next;
140         _recycled_asio_count--;
141     } else {
142         /* No recycled descriptors. Allocate new one. */
143         ANEW0(asio);
144     }
145 
146     asio->next          = NULL;
147     asio->as            = as;
148     asio->is_io_read    = is_io_read;
149     asio->buffer        = (uint8_t*)buffer;
150     asio->to_transfer   = len;
151     asio->transferred   = 0;
152     asio->on_io         = io_cb;
153     asio->io_opaque     = io_opaque;
154     asio->state         = ASIO_STATE_QUEUED;
155     asio->ref_count     = 1;
156     asio->deadline      = deadline;
157     loopTimer_init(asio->timer, _async_socket_get_looper(as),
158                    _on_async_socket_io_timed_out, asio);
159     loopTimer_startAbsolute(asio->timer, deadline);
160 
161     /* Reference socket that is holding this I/O. */
162     async_socket_reference(as);
163 
164     T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data",
165       _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len);
166 
167     return asio;
168 }
169 
170 /* Destroys and frees I/O descriptor. */
171 static void
_async_socket_io_free(AsyncSocketIO * asio)172 _async_socket_io_free(AsyncSocketIO* asio)
173 {
174     AsyncSocket* const as = asio->as;
175 
176     T("ASocket %s: %s I/O descriptor %p is destroyed.",
177       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
178 
179     loopTimer_done(asio->timer);
180 
181     /* Try to recycle it first, and free the memory if recycler is full. */
182     if (_recycled_asio_count < _max_recycled_asio_num) {
183         asio->next = _asio_recycled;
184         _asio_recycled = asio;
185         _recycled_asio_count++;
186     } else {
187         AFREE(asio);
188     }
189 
190     /* Release socket that is holding this I/O. */
191     async_socket_release(as);
192 }
193 
194 /* An I/O has been finished and its descriptor is about to be discarded. */
195 static void
_async_socket_io_finished(AsyncSocketIO * asio)196 _async_socket_io_finished(AsyncSocketIO* asio)
197 {
198     /* Notify the client of the I/O that I/O is finished. */
199     asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED);
200 }
201 
202 int
async_socket_io_reference(AsyncSocketIO * asio)203 async_socket_io_reference(AsyncSocketIO* asio)
204 {
205     assert(asio->ref_count > 0);
206     asio->ref_count++;
207     return asio->ref_count;
208 }
209 
210 int
async_socket_io_release(AsyncSocketIO * asio)211 async_socket_io_release(AsyncSocketIO* asio)
212 {
213     assert(asio->ref_count > 0);
214     asio->ref_count--;
215     if (asio->ref_count == 0) {
216         _async_socket_io_finished(asio);
217         /* Last reference has been dropped. Destroy this object. */
218         _async_socket_io_free(asio);
219         return 0;
220     }
221     return asio->ref_count;
222 }
223 
224 /* Creates new asynchronous socket reader.
225  * Param:
226  *  as - Asynchronous socket for the reader.
227  *  buffer, len - Reader's buffer.
228  *  io_cb - Reader's callback.
229  *  reader_opaque - An opaque pointer associated with the reader.
230  *  deadline - Deadline to complete the operation.
231  * Return:
232  *  An initialized AsyncSocketIO intance.
233  */
234 static AsyncSocketIO*
_async_socket_reader_new(AsyncSocket * as,void * buffer,uint32_t len,on_as_io_cb io_cb,void * reader_opaque,Duration deadline)235 _async_socket_reader_new(AsyncSocket* as,
236                          void* buffer,
237                          uint32_t len,
238                          on_as_io_cb io_cb,
239                          void* reader_opaque,
240                          Duration deadline)
241 {
242     AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
243                                                      reader_opaque, deadline);
244     return asio;
245 }
246 
247 /* Creates new asynchronous socket writer.
248  * Param:
249  *  as - Asynchronous socket for the writer.
250  *  buffer, len - Writer's buffer.
251  *  io_cb - Writer's callback.
252  *  writer_opaque - An opaque pointer associated with the writer.
253  *  deadline - Deadline to complete the operation.
254  * Return:
255  *  An initialized AsyncSocketIO intance.
256  */
257 static AsyncSocketIO*
_async_socket_writer_new(AsyncSocket * as,const void * buffer,uint32_t len,on_as_io_cb io_cb,void * writer_opaque,Duration deadline)258 _async_socket_writer_new(AsyncSocket* as,
259                          const void* buffer,
260                          uint32_t len,
261                          on_as_io_cb io_cb,
262                          void* writer_opaque,
263                          Duration deadline)
264 {
265     AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
266                                                      io_cb, writer_opaque,
267                                                      deadline);
268     return asio;
269 }
270 
271 /* I/O timed out. */
272 static void
_on_async_socket_io_timed_out(void * opaque)273 _on_async_socket_io_timed_out(void* opaque)
274 {
275     AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
276     AsyncSocket* const as = asio->as;
277 
278     D("ASocket %s: %s I/O with deadline %lld has timed out at %lld",
279       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
280       asio->deadline, async_socket_deadline(as, 0));
281 
282     /* Reference while in callback. */
283     async_socket_io_reference(asio);
284     _async_socket_io_timed_out(asio->as, asio);
285     async_socket_io_release(asio);
286 }
287 
288 /********************************************************************************
289  *                 Public Asynchronous Socket I/O API
290  *******************************************************************************/
291 
292 AsyncSocket*
async_socket_io_get_socket(const AsyncSocketIO * asio)293 async_socket_io_get_socket(const AsyncSocketIO* asio)
294 {
295     async_socket_reference(asio->as);
296     return asio->as;
297 }
298 
299 void
async_socket_io_cancel_time_out(AsyncSocketIO * asio)300 async_socket_io_cancel_time_out(AsyncSocketIO* asio)
301 {
302     loopTimer_stop(asio->timer);
303 }
304 
305 void*
async_socket_io_get_io_opaque(const AsyncSocketIO * asio)306 async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
307 {
308     return asio->io_opaque;
309 }
310 
311 void*
async_socket_io_get_client_opaque(const AsyncSocketIO * asio)312 async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
313 {
314     return async_socket_get_client_opaque(asio->as);
315 }
316 
317 void*
async_socket_io_get_buffer_info(const AsyncSocketIO * asio,uint32_t * transferred,uint32_t * to_transfer)318 async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
319                                 uint32_t* transferred,
320                                 uint32_t* to_transfer)
321 {
322     if (transferred != NULL) {
323         *transferred = asio->transferred;
324     }
325     if (to_transfer != NULL) {
326         *to_transfer = asio->to_transfer;
327     }
328     return asio->buffer;
329 }
330 
331 void*
async_socket_io_get_buffer(const AsyncSocketIO * asio)332 async_socket_io_get_buffer(const AsyncSocketIO* asio)
333 {
334     return asio->buffer;
335 }
336 
337 uint32_t
async_socket_io_get_transferred(const AsyncSocketIO * asio)338 async_socket_io_get_transferred(const AsyncSocketIO* asio)
339 {
340     return asio->transferred;
341 }
342 
343 uint32_t
async_socket_io_get_to_transfer(const AsyncSocketIO * asio)344 async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
345 {
346     return asio->to_transfer;
347 }
348 
349 int
async_socket_io_is_read(const AsyncSocketIO * asio)350 async_socket_io_is_read(const AsyncSocketIO* asio)
351 {
352     return asio->is_io_read;
353 }
354 
355 /********************************************************************************
356  *                      Asynchronous Socket internals
357  *******************************************************************************/
358 
359 struct AsyncSocket {
360     /* TCP address for the socket. */
361     SockAddress         address;
362     /* Connection callback for this socket. */
363     on_as_connection_cb on_connection;
364     /* An opaque pointer associated with this socket by the client. */
365     void*               client_opaque;
366     /* I/O looper for asynchronous I/O on the socket. */
367     Looper*             looper;
368     /* I/O descriptor for asynchronous I/O on the socket. */
369     LoopIo              io[1];
370     /* Timer to use for reconnection attempts. */
371     LoopTimer           reconnect_timer[1];
372     /* Head of the list of the active readers. */
373     AsyncSocketIO*      readers_head;
374     /* Tail of the list of the active readers. */
375     AsyncSocketIO*      readers_tail;
376     /* Head of the list of the active writers. */
377     AsyncSocketIO*      writers_head;
378     /* Tail of the list of the active writers. */
379     AsyncSocketIO*      writers_tail;
380     /* Socket's file descriptor. */
381     int                 fd;
382     /* Timeout to use for reconnection attempts. */
383     int                 reconnect_to;
384     /* Number of outstanding references to the socket. */
385     int                 ref_count;
386     /* Flags whether (1) or not (0) socket owns the looper. */
387     int                 owns_looper;
388 };
389 
390 static const char*
_async_socket_string(AsyncSocket * as)391 _async_socket_string(AsyncSocket* as)
392 {
393     return sock_address_to_string(&as->address);
394 }
395 
396 static Looper*
_async_socket_get_looper(AsyncSocket * as)397 _async_socket_get_looper(AsyncSocket* as)
398 {
399     return as->looper;
400 }
401 
402 /* Pulls first reader out of the list.
403  * Param:
404  *  as - Initialized AsyncSocket instance.
405  * Return:
406  *  First I/O pulled out of the list, or NULL if there are no I/O in the list.
407  *  Note that the caller is responsible for releasing the I/O object returned
408  *  from this routine.
409  */
410 static AsyncSocketIO*
_async_socket_pull_first_io(AsyncSocket * as,AsyncSocketIO ** list_head,AsyncSocketIO ** list_tail)411 _async_socket_pull_first_io(AsyncSocket* as,
412                             AsyncSocketIO** list_head,
413                             AsyncSocketIO** list_tail)
414 {
415     AsyncSocketIO* const ret = *list_head;
416     if (ret != NULL) {
417         *list_head = ret->next;
418         ret->next = NULL;
419         if (*list_head == NULL) {
420             *list_tail = NULL;
421         }
422     }
423     return ret;
424 }
425 
426 /* Pulls first reader out of the list.
427  * Param:
428  *  as - Initialized AsyncSocket instance.
429  * Return:
430  *  First reader pulled out of the list, or NULL if there are no readers in the
431  *  list.
432  *  Note that the caller is responsible for releasing the I/O object returned
433  *  from this routine.
434  */
435 static AsyncSocketIO*
_async_socket_pull_first_reader(AsyncSocket * as)436 _async_socket_pull_first_reader(AsyncSocket* as)
437 {
438     return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
439 }
440 
441 /* Pulls first writer out of the list.
442  * Param:
443  *  as - Initialized AsyncSocket instance.
444  * Return:
445  *  First writer pulled out of the list, or NULL if there are no writers in the
446  *  list.
447  *  Note that the caller is responsible for releasing the I/O object returned
448  *  from this routine.
449  */
450 static AsyncSocketIO*
_async_socket_pull_first_writer(AsyncSocket * as)451 _async_socket_pull_first_writer(AsyncSocket* as)
452 {
453     return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
454 }
455 
456 /* Removes an I/O descriptor from a list of active I/O.
457  * Param:
458  *  as - Initialized AsyncSocket instance.
459  *  list_head, list_tail - Pointers to the list head and tail.
460  *  io - I/O to remove.
461  * Return:
462  *  Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
463  */
464 static int
_async_socket_remove_io(AsyncSocket * as,AsyncSocketIO ** list_head,AsyncSocketIO ** list_tail,AsyncSocketIO * io)465 _async_socket_remove_io(AsyncSocket* as,
466                         AsyncSocketIO** list_head,
467                         AsyncSocketIO** list_tail,
468                         AsyncSocketIO* io)
469 {
470     AsyncSocketIO* prev = NULL;
471 
472     while (*list_head != NULL && io != *list_head) {
473         prev = *list_head;
474         list_head = &((*list_head)->next);
475     }
476     if (*list_head == NULL) {
477         D("%s: I/O %p is not found in the list for socket '%s'",
478           __FUNCTION__, io, _async_socket_string(as));
479         return 0;
480     }
481 
482     *list_head = io->next;
483     if (prev != NULL) {
484         prev->next = io->next;
485     }
486     if (*list_tail == io) {
487         *list_tail = prev;
488     }
489 
490     /* Release I/O adjusting reference added when I/O has been saved in the list. */
491     async_socket_io_release(io);
492 
493     return 1;
494 }
495 
496 /* Advances to the next I/O in the list.
497  * Param:
498  *  as - Initialized AsyncSocket instance.
499  *  list_head, list_tail - Pointers to the list head and tail.
500  */
501 static void
_async_socket_advance_io(AsyncSocket * as,AsyncSocketIO ** list_head,AsyncSocketIO ** list_tail)502 _async_socket_advance_io(AsyncSocket* as,
503                          AsyncSocketIO** list_head,
504                          AsyncSocketIO** list_tail)
505 {
506     AsyncSocketIO* first_io = *list_head;
507     if (first_io != NULL) {
508         *list_head = first_io->next;
509         first_io->next = NULL;
510     }
511     if (*list_head == NULL) {
512         *list_tail = NULL;
513     }
514     if (first_io != NULL) {
515         /* Release I/O removed from the head of the list. */
516         async_socket_io_release(first_io);
517     }
518 }
519 
520 /* Advances to the next reader in the list.
521  * Param:
522  *  as - Initialized AsyncSocket instance.
523  */
524 static void
_async_socket_advance_reader(AsyncSocket * as)525 _async_socket_advance_reader(AsyncSocket* as)
526 {
527     _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
528 }
529 
530 /* Advances to the next writer in the list.
531  * Param:
532  *  as - Initialized AsyncSocket instance.
533  */
534 static void
_async_socket_advance_writer(AsyncSocket * as)535 _async_socket_advance_writer(AsyncSocket* as)
536 {
537     _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
538 }
539 
540 /* Completes an I/O.
541  * Param:
542  *  as - Initialized AsyncSocket instance.
543  *  asio - I/O to complete.
544  * Return:
545  *  One of AsyncIOAction values.
546  */
547 static AsyncIOAction
_async_socket_complete_io(AsyncSocket * as,AsyncSocketIO * asio)548 _async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
549 {
550     T("ASocket %s: %s I/O %p is completed.",
551       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
552 
553     /* Stop the timer. */
554     async_socket_io_cancel_time_out(asio);
555 
556     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
557 }
558 
559 /* Timeouts an I/O.
560  * Param:
561  *  as - Initialized AsyncSocket instance.
562  *  asio - An I/O that has timed out.
563  * Return:
564  *  One of AsyncIOAction values.
565  */
566 static AsyncIOAction
_async_socket_io_timed_out(AsyncSocket * as,AsyncSocketIO * asio)567 _async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
568 {
569     T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld",
570       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
571       asio->deadline, async_socket_deadline(as, 0));
572 
573     /* Report to the client. */
574     const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
575                                              ASIO_STATE_TIMED_OUT);
576 
577     /* Remove the I/O from a list of active I/O for actions other than retry. */
578     if (action != ASIO_ACTION_RETRY) {
579         if (asio->is_io_read) {
580             _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
581         } else {
582             _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
583         }
584     }
585 
586     return action;
587 }
588 
589 /* Cancels an I/O.
590  * Param:
591  *  as - Initialized AsyncSocket instance.
592  *  asio - An I/O to cancel.
593  * Return:
594  *  One of AsyncIOAction values.
595  */
596 static AsyncIOAction
_async_socket_cancel_io(AsyncSocket * as,AsyncSocketIO * asio)597 _async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
598 {
599     T("ASocket %s: %s I/O %p is cancelled.",
600       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
601 
602     /* Stop the timer. */
603     async_socket_io_cancel_time_out(asio);
604 
605     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
606 }
607 
608 /* Reports an I/O failure.
609  * Param:
610  *  as - Initialized AsyncSocket instance.
611  *  asio - An I/O that has failed. Can be NULL for general failures.
612  *  failure - Failure (errno) that has occurred.
613  * Return:
614  *  One of AsyncIOAction values.
615  */
616 static AsyncIOAction
_async_socket_io_failure(AsyncSocket * as,AsyncSocketIO * asio,int failure)617 _async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
618 {
619     T("ASocket %s: %s I/O %p has failed: %d -> %s",
620       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
621       failure, strerror(failure));
622 
623     /* Stop the timer. */
624     async_socket_io_cancel_time_out(asio);
625 
626     errno = failure;
627     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
628 }
629 
630 /* Cancels all the active socket readers.
631  * Param:
632  *  as - Initialized AsyncSocket instance.
633  */
634 static void
_async_socket_cancel_readers(AsyncSocket * as)635 _async_socket_cancel_readers(AsyncSocket* as)
636 {
637     while (as->readers_head != NULL) {
638         AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
639         /* We ignore action returned from the cancellation callback, since we're
640          * in a disconnected state here. */
641         _async_socket_cancel_io(as, to_cancel);
642         async_socket_io_release(to_cancel);
643     }
644 }
645 
646 /* Cancels all the active socket writers.
647  * Param:
648  *  as - Initialized AsyncSocket instance.
649  */
650 static void
_async_socket_cancel_writers(AsyncSocket * as)651 _async_socket_cancel_writers(AsyncSocket* as)
652 {
653     while (as->writers_head != NULL) {
654         AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
655         /* We ignore action returned from the cancellation callback, since we're
656          * in a disconnected state here. */
657         _async_socket_cancel_io(as, to_cancel);
658         async_socket_io_release(to_cancel);
659     }
660 }
661 
662 /* Cancels all the I/O on the socket. */
663 static void
_async_socket_cancel_all_io(AsyncSocket * as)664 _async_socket_cancel_all_io(AsyncSocket* as)
665 {
666     /* Stop the reconnection timer. */
667     loopTimer_stop(as->reconnect_timer);
668 
669     /* Stop read / write on the socket. */
670     loopIo_dontWantWrite(as->io);
671     loopIo_dontWantRead(as->io);
672 
673     /* Cancel active readers and writers. */
674     _async_socket_cancel_readers(as);
675     _async_socket_cancel_writers(as);
676 }
677 
678 /* Closes socket handle used by the async socket.
679  * Param:
680  *  as - Initialized AsyncSocket instance.
681  */
682 static void
_async_socket_close_socket(AsyncSocket * as)683 _async_socket_close_socket(AsyncSocket* as)
684 {
685     if (as->fd >= 0) {
686         loopIo_done(as->io);
687         socket_close(as->fd);
688         T("ASocket %s: Socket handle %d is closed.",
689           _async_socket_string(as), as->fd);
690         as->fd = -1;
691     }
692 }
693 
694 /* Destroys AsyncSocket instance.
695  * Param:
696  *  as - Initialized AsyncSocket instance.
697  */
698 static void
_async_socket_free(AsyncSocket * as)699 _async_socket_free(AsyncSocket* as)
700 {
701     if (as != NULL) {
702         T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as));
703 
704         /* Close socket. */
705         _async_socket_close_socket(as);
706 
707         /* Free allocated resources. */
708         if (as->looper != NULL) {
709             loopTimer_done(as->reconnect_timer);
710             if (as->owns_looper) {
711                 looper_free(as->looper);
712             }
713         }
714         sock_address_done(&as->address);
715         AFREE(as);
716     }
717 }
718 
719 /* Starts reconnection attempts after connection has been lost.
720  * Param:
721  *  as - Initialized AsyncSocket instance.
722  *  to - Milliseconds to wait before reconnection attempt.
723  */
724 static void
_async_socket_reconnect(AsyncSocket * as,int to)725 _async_socket_reconnect(AsyncSocket* as, int to)
726 {
727     T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to);
728 
729     /* Make sure that no I/O is active, and socket is closed before we
730      * reconnect. */
731     _async_socket_cancel_all_io(as);
732 
733     /* Set the timer for reconnection attempt. */
734     loopTimer_startRelative(as->reconnect_timer, to);
735 }
736 
737 /********************************************************************************
738  *                      Asynchronous Socket callbacks
739  *******************************************************************************/
740 
741 /* A callback that is invoked when socket gets disconnected.
742  * Param:
743  *  as - Initialized AsyncSocket instance.
744  */
745 static void
_on_async_socket_disconnected(AsyncSocket * as)746 _on_async_socket_disconnected(AsyncSocket* as)
747 {
748     /* Save error to restore it for the client's callback. */
749     const int save_errno = errno;
750     AsyncIOAction action = ASIO_ACTION_ABORT;
751 
752     D("ASocket %s: Disconnected.", _async_socket_string(as));
753 
754     /* Cancel all the I/O on this socket. */
755     _async_socket_cancel_all_io(as);
756 
757     /* Close the socket. */
758     _async_socket_close_socket(as);
759 
760     /* Restore errno, and invoke client's callback. */
761     errno = save_errno;
762     action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
763 
764     if (action == ASIO_ACTION_RETRY) {
765         /* Client requested reconnection. */
766         _async_socket_reconnect(as, as->reconnect_to);
767     }
768 }
769 
770 /* A callback that is invoked on socket's I/O failure.
771  * Param:
772  *  as - Initialized AsyncSocket instance.
773  *  asio - Descriptor for the failed I/O. Can be NULL for general failures.
774  */
775 static AsyncIOAction
_on_async_socket_failure(AsyncSocket * as,AsyncSocketIO * asio)776 _on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
777 {
778     D("ASocket %s: %s I/O failure: %d -> %s",
779       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
780       errno, strerror(errno));
781 
782     /* Report the failure. */
783     return _async_socket_io_failure(as, asio, errno);
784 }
785 
786 /* A callback that is invoked when there is data available to read.
787  * Param:
788  *  as - Initialized AsyncSocket instance.
789  * Return:
790  *  0 on success, or -1 on failure. Failure returned from this routine will
791  *  skip writes (if awailable) behind this read.
792  */
793 static int
_on_async_socket_recv(AsyncSocket * as)794 _on_async_socket_recv(AsyncSocket* as)
795 {
796     AsyncIOAction action;
797 
798     /* Get current reader. */
799     AsyncSocketIO* const asr = as->readers_head;
800     if (asr == NULL) {
801         D("ASocket %s: No reader is available.", _async_socket_string(as));
802         loopIo_dontWantRead(as->io);
803         return 0;
804     }
805 
806     /* Reference the reader while we're working with it in this callback. */
807     async_socket_io_reference(asr);
808 
809     /* Bump I/O state, and inform the client that I/O is in progress. */
810     if (asr->state == ASIO_STATE_QUEUED) {
811         asr->state = ASIO_STATE_STARTED;
812     } else {
813         asr->state = ASIO_STATE_CONTINUES;
814     }
815     action = asr->on_io(asr->io_opaque, asr, asr->state);
816     if (action == ASIO_ACTION_ABORT) {
817         D("ASocket %s: Read is aborted by the client.", _async_socket_string(as));
818         /* Move on to the next reader. */
819         _async_socket_advance_reader(as);
820         /* Lets see if there are still active readers, and enable, or disable
821          * read I/O callback accordingly. */
822         if (as->readers_head != NULL) {
823             loopIo_wantRead(as->io);
824         } else {
825             loopIo_dontWantRead(as->io);
826         }
827         async_socket_io_release(asr);
828         return 0;
829     }
830 
831     /* Read next chunk of data. */
832     int res = socket_recv(as->fd, asr->buffer + asr->transferred,
833                           asr->to_transfer - asr->transferred);
834     while (res < 0 && errno == EINTR) {
835         res = socket_recv(as->fd, asr->buffer + asr->transferred,
836                           asr->to_transfer - asr->transferred);
837     }
838 
839     if (res == 0) {
840         /* Socket has been disconnected. */
841         errno = ECONNRESET;
842         _on_async_socket_disconnected(as);
843         async_socket_io_release(asr);
844         return -1;
845     }
846 
847     if (res < 0) {
848         if (errno == EWOULDBLOCK || errno == EAGAIN) {
849             /* Yield to writes behind this read. */
850             loopIo_wantRead(as->io);
851             async_socket_io_release(asr);
852             return 0;
853         }
854 
855         /* An I/O error. */
856         action = _on_async_socket_failure(as, asr);
857         if (action != ASIO_ACTION_RETRY) {
858             D("ASocket %s: Read is aborted on failure.", _async_socket_string(as));
859             /* Move on to the next reader. */
860             _async_socket_advance_reader(as);
861             /* Lets see if there are still active readers, and enable, or disable
862              * read I/O callback accordingly. */
863             if (as->readers_head != NULL) {
864                 loopIo_wantRead(as->io);
865             } else {
866                 loopIo_dontWantRead(as->io);
867             }
868         }
869         async_socket_io_release(asr);
870         return -1;
871     }
872 
873     /* Update the reader's descriptor. */
874     asr->transferred += res;
875     if (asr->transferred == asr->to_transfer) {
876         /* This read is completed. Move on to the next reader. */
877         _async_socket_advance_reader(as);
878 
879         /* Notify reader completion. */
880         _async_socket_complete_io(as, asr);
881     }
882 
883     /* Lets see if there are still active readers, and enable, or disable read
884      * I/O callback accordingly. */
885     if (as->readers_head != NULL) {
886         loopIo_wantRead(as->io);
887     } else {
888         loopIo_dontWantRead(as->io);
889     }
890 
891     async_socket_io_release(asr);
892 
893     return 0;
894 }
895 
896 /* A callback that is invoked when there is data available to write.
897  * Param:
898  *  as - Initialized AsyncSocket instance.
899  * Return:
900  *  0 on success, or -1 on failure. Failure returned from this routine will
901  *  skip reads (if awailable) behind this write.
902  */
903 static int
_on_async_socket_send(AsyncSocket * as)904 _on_async_socket_send(AsyncSocket* as)
905 {
906     AsyncIOAction action;
907 
908     /* Get current writer. */
909     AsyncSocketIO* const asw = as->writers_head;
910     if (asw == NULL) {
911         D("ASocket %s: No writer is available.", _async_socket_string(as));
912         loopIo_dontWantWrite(as->io);
913         return 0;
914     }
915 
916     /* Reference the writer while we're working with it in this callback. */
917     async_socket_io_reference(asw);
918 
919     /* Bump I/O state, and inform the client that I/O is in progress. */
920     if (asw->state == ASIO_STATE_QUEUED) {
921         asw->state = ASIO_STATE_STARTED;
922     } else {
923         asw->state = ASIO_STATE_CONTINUES;
924     }
925     action = asw->on_io(asw->io_opaque, asw, asw->state);
926     if (action == ASIO_ACTION_ABORT) {
927         D("ASocket %s: Write is aborted by the client.", _async_socket_string(as));
928         /* Move on to the next writer. */
929         _async_socket_advance_writer(as);
930         /* Lets see if there are still active writers, and enable, or disable
931          * write I/O callback accordingly. */
932         if (as->writers_head != NULL) {
933             loopIo_wantWrite(as->io);
934         } else {
935             loopIo_dontWantWrite(as->io);
936         }
937         async_socket_io_release(asw);
938         return 0;
939     }
940 
941     /* Write next chunk of data. */
942     int res = socket_send(as->fd, asw->buffer + asw->transferred,
943                           asw->to_transfer - asw->transferred);
944     while (res < 0 && errno == EINTR) {
945         res = socket_send(as->fd, asw->buffer + asw->transferred,
946                           asw->to_transfer - asw->transferred);
947     }
948 
949     if (res == 0) {
950         /* Socket has been disconnected. */
951         errno = ECONNRESET;
952         _on_async_socket_disconnected(as);
953         async_socket_io_release(asw);
954         return -1;
955     }
956 
957     if (res < 0) {
958         if (errno == EWOULDBLOCK || errno == EAGAIN) {
959             /* Yield to reads behind this write. */
960             loopIo_wantWrite(as->io);
961             async_socket_io_release(asw);
962             return 0;
963         }
964 
965         /* An I/O error. */
966         action = _on_async_socket_failure(as, asw);
967         if (action != ASIO_ACTION_RETRY) {
968             D("ASocket %s: Write is aborted on failure.", _async_socket_string(as));
969             /* Move on to the next writer. */
970             _async_socket_advance_writer(as);
971             /* Lets see if there are still active writers, and enable, or disable
972              * write I/O callback accordingly. */
973             if (as->writers_head != NULL) {
974                 loopIo_wantWrite(as->io);
975             } else {
976                 loopIo_dontWantWrite(as->io);
977             }
978         }
979         async_socket_io_release(asw);
980         return -1;
981     }
982 
983     /* Update the writer descriptor. */
984     asw->transferred += res;
985     if (asw->transferred == asw->to_transfer) {
986         /* This write is completed. Move on to the next writer. */
987         _async_socket_advance_writer(as);
988 
989         /* Notify writer completion. */
990         _async_socket_complete_io(as, asw);
991     }
992 
993     /* Lets see if there are still active writers, and enable, or disable write
994      * I/O callback accordingly. */
995     if (as->writers_head != NULL) {
996         loopIo_wantWrite(as->io);
997     } else {
998         loopIo_dontWantWrite(as->io);
999     }
1000 
1001     async_socket_io_release(asw);
1002 
1003     return 0;
1004 }
1005 
1006 /* A callback that is invoked when an I/O is available on socket.
1007  * Param:
1008  *  as - Initialized AsyncSocket instance.
1009  *  fd - Socket's file descriptor.
1010  *  events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
1011  */
1012 static void
_on_async_socket_io(void * opaque,int fd,unsigned events)1013 _on_async_socket_io(void* opaque, int fd, unsigned events)
1014 {
1015     AsyncSocket* const as = (AsyncSocket*)opaque;
1016 
1017     /* Reference the socket while we're working with it in this callback. */
1018     async_socket_reference(as);
1019 
1020     if ((events & LOOP_IO_READ) != 0) {
1021         if (_on_async_socket_recv(as) != 0) {
1022             async_socket_release(as);
1023             return;
1024         }
1025     }
1026 
1027     if ((events & LOOP_IO_WRITE) != 0) {
1028         if (_on_async_socket_send(as) != 0) {
1029             async_socket_release(as);
1030             return;
1031         }
1032     }
1033 
1034     async_socket_release(as);
1035 }
1036 
1037 /* A callback that is invoked by asynchronous socket connector on connection
1038  *  events.
1039  * Param:
1040  *  opaque - Initialized AsyncSocket instance.
1041  *  connector - Connector that is used to connect this socket.
1042  *  event - Connection event.
1043  * Return:
1044  *  One of AsyncIOAction values.
1045  */
1046 static AsyncIOAction
_on_connector_events(void * opaque,AsyncSocketConnector * connector,AsyncIOState event)1047 _on_connector_events(void* opaque,
1048                      AsyncSocketConnector* connector,
1049                      AsyncIOState event)
1050 {
1051     AsyncIOAction action;
1052     AsyncSocket* const as = (AsyncSocket*)opaque;
1053 
1054     /* Reference the socket while we're working with it in this callback. */
1055     async_socket_reference(as);
1056 
1057     if (event == ASIO_STATE_SUCCEEDED) {
1058         /* Accept the connection. */
1059         as->fd = async_socket_connector_pull_fd(connector);
1060         loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
1061     }
1062 
1063     /* Invoke client's callback. */
1064     action = as->on_connection(as->client_opaque, as, event);
1065     if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
1066         /* For whatever reason the client didn't want to keep this connection.
1067          * Close it. */
1068         D("ASocket %s: Connection is discarded by the client.",
1069           _async_socket_string(as));
1070         _async_socket_close_socket(as);
1071     }
1072 
1073     if (action != ASIO_ACTION_RETRY) {
1074         async_socket_connector_release(connector);
1075     }
1076 
1077     async_socket_release(as);
1078 
1079     return action;
1080 }
1081 
1082 /* Timer callback invoked to reconnect the lost connection.
1083  * Param:
1084  *  as - Initialized AsyncSocket instance.
1085  */
1086 void
_on_async_socket_reconnect(void * opaque)1087 _on_async_socket_reconnect(void* opaque)
1088 {
1089     AsyncSocket* as = (AsyncSocket*)opaque;
1090 
1091     /* Reference the socket while we're working with it in this callback. */
1092     async_socket_reference(as);
1093     async_socket_connect(as, as->reconnect_to);
1094     async_socket_release(as);
1095 }
1096 
1097 
1098 /********************************************************************************
1099  *                  Android Device Socket public API
1100  *******************************************************************************/
1101 
1102 AsyncSocket*
async_socket_new(int port,int reconnect_to,on_as_connection_cb client_cb,void * client_opaque,Looper * looper)1103 async_socket_new(int port,
1104                  int reconnect_to,
1105                  on_as_connection_cb client_cb,
1106                  void* client_opaque,
1107                  Looper* looper)
1108 {
1109     AsyncSocket* as;
1110 
1111     if (client_cb == NULL) {
1112         E("Invalid client_cb parameter");
1113         return NULL;
1114     }
1115 
1116     ANEW0(as);
1117 
1118     as->fd = -1;
1119     as->client_opaque = client_opaque;
1120     as->on_connection = client_cb;
1121     as->readers_head = as->readers_tail = NULL;
1122     as->reconnect_to = reconnect_to;
1123     as->ref_count = 1;
1124     sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
1125     if (looper == NULL) {
1126         as->looper = looper_newCore();
1127         if (as->looper == NULL) {
1128             E("Unable to create I/O looper for async socket '%s'",
1129               _async_socket_string(as));
1130             client_cb(client_opaque, as, ASIO_STATE_FAILED);
1131             _async_socket_free(as);
1132             return NULL;
1133         }
1134         as->owns_looper = 1;
1135     } else {
1136         as->looper = looper;
1137         as->owns_looper = 0;
1138     }
1139 
1140     loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
1141 
1142     T("ASocket %s: Descriptor is created.", _async_socket_string(as));
1143 
1144     return as;
1145 }
1146 
1147 int
async_socket_reference(AsyncSocket * as)1148 async_socket_reference(AsyncSocket* as)
1149 {
1150     assert(as->ref_count > 0);
1151     as->ref_count++;
1152     return as->ref_count;
1153 }
1154 
1155 int
async_socket_release(AsyncSocket * as)1156 async_socket_release(AsyncSocket* as)
1157 {
1158     assert(as->ref_count > 0);
1159     as->ref_count--;
1160     if (as->ref_count == 0) {
1161         /* Last reference has been dropped. Destroy this object. */
1162         _async_socket_cancel_all_io(as);
1163         _async_socket_free(as);
1164         return 0;
1165     }
1166     return as->ref_count;
1167 }
1168 
1169 void
async_socket_connect(AsyncSocket * as,int retry_to)1170 async_socket_connect(AsyncSocket* as, int retry_to)
1171 {
1172     T("ASocket %s: Handling connection request for %dms...",
1173       _async_socket_string(as), retry_to);
1174 
1175     AsyncSocketConnector* const connector =
1176         async_socket_connector_new(&as->address, retry_to, _on_connector_events,
1177                                    as, as->looper);
1178     if (connector != NULL) {
1179         async_socket_connector_connect(connector);
1180     } else {
1181         as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
1182     }
1183 }
1184 
1185 void
async_socket_disconnect(AsyncSocket * as)1186 async_socket_disconnect(AsyncSocket* as)
1187 {
1188     T("ASocket %s: Handling disconnection request...", _async_socket_string(as));
1189 
1190     if (as != NULL) {
1191         _async_socket_cancel_all_io(as);
1192         _async_socket_close_socket(as);
1193     }
1194 }
1195 
1196 void
async_socket_reconnect(AsyncSocket * as,int retry_to)1197 async_socket_reconnect(AsyncSocket* as, int retry_to)
1198 {
1199     T("ASocket %s: Handling reconnection request for %dms...",
1200       _async_socket_string(as), retry_to);
1201 
1202     _async_socket_cancel_all_io(as);
1203     _async_socket_close_socket(as);
1204     _async_socket_reconnect(as, retry_to);
1205 }
1206 
1207 void
async_socket_read_abs(AsyncSocket * as,void * buffer,uint32_t len,on_as_io_cb reader_cb,void * reader_opaque,Duration deadline)1208 async_socket_read_abs(AsyncSocket* as,
1209                       void* buffer, uint32_t len,
1210                       on_as_io_cb reader_cb,
1211                       void* reader_opaque,
1212                       Duration deadline)
1213 {
1214     T("ASocket %s: Handling read for %d bytes with deadline %lld...",
1215       _async_socket_string(as), len, deadline);
1216 
1217     AsyncSocketIO* const asr =
1218         _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
1219                                  deadline);
1220     /* Add new reader to the list. Note that we use initial reference from I/O
1221      * 'new' routine as "in the list" reference counter. */
1222     if (as->readers_head == NULL) {
1223         as->readers_head = as->readers_tail = asr;
1224     } else {
1225         as->readers_tail->next = asr;
1226         as->readers_tail = asr;
1227     }
1228     loopIo_wantRead(as->io);
1229 }
1230 
1231 void
async_socket_read_rel(AsyncSocket * as,void * buffer,uint32_t len,on_as_io_cb reader_cb,void * reader_opaque,int to)1232 async_socket_read_rel(AsyncSocket* as,
1233                       void* buffer, uint32_t len,
1234                       on_as_io_cb reader_cb,
1235                       void* reader_opaque,
1236                       int to)
1237 {
1238     const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1239                                     DURATION_INFINITE;
1240     async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
1241 }
1242 
1243 void
async_socket_write_abs(AsyncSocket * as,const void * buffer,uint32_t len,on_as_io_cb writer_cb,void * writer_opaque,Duration deadline)1244 async_socket_write_abs(AsyncSocket* as,
1245                        const void* buffer, uint32_t len,
1246                        on_as_io_cb writer_cb,
1247                        void* writer_opaque,
1248                        Duration deadline)
1249 {
1250     T("ASocket %s: Handling write for %d bytes with deadline %lld...",
1251       _async_socket_string(as), len, deadline);
1252 
1253     AsyncSocketIO* const asw =
1254         _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
1255                                  deadline);
1256     /* Add new writer to the list. Note that we use initial reference from I/O
1257      * 'new' routine as "in the list" reference counter. */
1258     if (as->writers_head == NULL) {
1259         as->writers_head = as->writers_tail = asw;
1260     } else {
1261         as->writers_tail->next = asw;
1262         as->writers_tail = asw;
1263     }
1264     loopIo_wantWrite(as->io);
1265 }
1266 
1267 void
async_socket_write_rel(AsyncSocket * as,const void * buffer,uint32_t len,on_as_io_cb writer_cb,void * writer_opaque,int to)1268 async_socket_write_rel(AsyncSocket* as,
1269                        const void* buffer, uint32_t len,
1270                        on_as_io_cb writer_cb,
1271                        void* writer_opaque,
1272                        int to)
1273 {
1274     const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1275                                     DURATION_INFINITE;
1276     async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
1277 }
1278 
1279 void*
async_socket_get_client_opaque(const AsyncSocket * as)1280 async_socket_get_client_opaque(const AsyncSocket* as)
1281 {
1282     return as->client_opaque;
1283 }
1284 
1285 Duration
async_socket_deadline(AsyncSocket * as,int rel)1286 async_socket_deadline(AsyncSocket* as, int rel)
1287 {
1288     return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
1289                         DURATION_INFINITE;
1290 }
1291 
1292 int
async_socket_get_port(const AsyncSocket * as)1293 async_socket_get_port(const AsyncSocket* as)
1294 {
1295     return sock_address_get_port(&as->address);
1296 }
1297