1 /*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 /*
18 * Encapsulates exchange protocol between the emulator, and an Android device
19 * that is connected to the host via USB. The communication is established over
20 * a TCP port forwarding, enabled by ADB.
21 */
22
23 #include "android/async-socket-connector.h"
24 #include "android/async-socket.h"
25 #include "android/utils/debug.h"
26 #include "android/utils/eintr_wrapper.h"
27 #include "android/utils/panic.h"
28 #include "android/iolooper.h"
29
30 #define E(...) derror(__VA_ARGS__)
31 #define W(...) dwarning(__VA_ARGS__)
32 #define D(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
33 #define D_ACTIVE VERBOSE_CHECK(asyncsocket)
34
35 #define TRACE_ON 0
36
37 #if TRACE_ON
38 #define T(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
39 #else
40 #define T(...)
41 #endif
42
43 /********************************************************************************
44 * Asynchronous Socket internal API declarations
45 *******************************************************************************/
46
47 /* Gets socket's address string. */
48 static const char* _async_socket_string(AsyncSocket* as);
49
50 /* Gets socket's looper. */
51 static Looper* _async_socket_get_looper(AsyncSocket* as);
52
53 /* Handler for the I/O time out.
54 * Param:
55 * as - Asynchronous socket for the I/O.
56 * asio - Desciptor for the timed out I/O.
57 */
58 static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
59 AsyncSocketIO* asio);
60
61 /********************************************************************************
62 * Asynchronous Socket Reader / Writer
63 *******************************************************************************/
64
65 struct AsyncSocketIO {
66 /* Next I/O in the reader, or writer list. */
67 AsyncSocketIO* next;
68 /* Asynchronous socket for this I/O. */
69 AsyncSocket* as;
70 /* Timer used for time outs on this I/O. */
71 LoopTimer timer[1];
72 /* An opaque pointer associated with this I/O. */
73 void* io_opaque;
74 /* Buffer where to read / write data. */
75 uint8_t* buffer;
76 /* Bytes to transfer through the socket for this I/O. */
77 uint32_t to_transfer;
78 /* Bytes thransferred through the socket in this I/O. */
79 uint32_t transferred;
80 /* I/O callback for this I/O. */
81 on_as_io_cb on_io;
82 /* I/O type selector: 1 - read, 0 - write. */
83 int is_io_read;
84 /* State of the I/O. */
85 AsyncIOState state;
86 /* Number of outstanding references to the I/O. */
87 int ref_count;
88 /* Deadline for this I/O */
89 Duration deadline;
90 };
91
92 /*
93 * Recycling I/O instances.
94 * Since AsyncSocketIO instances are not that large, it makes sence to recycle
95 * them for faster allocation, rather than allocating and freeing them for each
96 * I/O on the socket.
97 */
98
99 /* List of recycled I/O descriptors. */
100 static AsyncSocketIO* _asio_recycled = NULL;
101 /* Number of I/O descriptors that are recycled in the _asio_recycled list. */
102 static int _recycled_asio_count = 0;
103 /* Maximum number of I/O descriptors that can be recycled. */
104 static const int _max_recycled_asio_num = 32;
105
106 /* Handler for an I/O time-out timer event.
107 * When this routine is invoked, it indicates that a time out has occurred on an
108 * I/O.
109 * Param:
110 * opaque - AsyncSocketIO instance representing the timed out I/O.
111 */
112 static void _on_async_socket_io_timed_out(void* opaque);
113
114 /* Creates new I/O descriptor.
115 * Param:
116 * as - Asynchronous socket for the I/O.
117 * is_io_read - I/O type selector: 1 - read, 0 - write.
118 * buffer, len - Reader / writer buffer address.
119 * io_cb - Callback for this reader / writer.
120 * io_opaque - An opaque pointer associated with the I/O.
121 * deadline - Deadline to complete the I/O.
122 * Return:
123 * Initialized AsyncSocketIO instance.
124 */
125 static AsyncSocketIO*
_async_socket_rw_new(AsyncSocket * as,int is_io_read,void * buffer,uint32_t len,on_as_io_cb io_cb,void * io_opaque,Duration deadline)126 _async_socket_rw_new(AsyncSocket* as,
127 int is_io_read,
128 void* buffer,
129 uint32_t len,
130 on_as_io_cb io_cb,
131 void* io_opaque,
132 Duration deadline)
133 {
134 /* Lookup in the recycler first. */
135 AsyncSocketIO* asio = _asio_recycled;
136 if (asio != NULL) {
137 /* Pull the descriptor from recycler. */
138 _asio_recycled = asio->next;
139 _recycled_asio_count--;
140 } else {
141 /* No recycled descriptors. Allocate new one. */
142 ANEW0(asio);
143 }
144
145 asio->next = NULL;
146 asio->as = as;
147 asio->is_io_read = is_io_read;
148 asio->buffer = (uint8_t*)buffer;
149 asio->to_transfer = len;
150 asio->transferred = 0;
151 asio->on_io = io_cb;
152 asio->io_opaque = io_opaque;
153 asio->state = ASIO_STATE_QUEUED;
154 asio->ref_count = 1;
155 asio->deadline = deadline;
156 loopTimer_init(asio->timer, _async_socket_get_looper(as),
157 _on_async_socket_io_timed_out, asio);
158 loopTimer_startAbsolute(asio->timer, deadline);
159
160 /* Reference socket that is holding this I/O. */
161 async_socket_reference(as);
162
163 T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data",
164 _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len);
165
166 return asio;
167 }
168
169 /* Destroys and frees I/O descriptor. */
170 static void
_async_socket_io_free(AsyncSocketIO * asio)171 _async_socket_io_free(AsyncSocketIO* asio)
172 {
173 AsyncSocket* const as = asio->as;
174
175 T("ASocket %s: %s I/O descriptor %p is destroyed.",
176 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
177
178 loopTimer_done(asio->timer);
179
180 /* Try to recycle it first, and free the memory if recycler is full. */
181 if (_recycled_asio_count < _max_recycled_asio_num) {
182 asio->next = _asio_recycled;
183 _asio_recycled = asio;
184 _recycled_asio_count++;
185 } else {
186 AFREE(asio);
187 }
188
189 /* Release socket that is holding this I/O. */
190 async_socket_release(as);
191 }
192
193 /* An I/O has been finished and its descriptor is about to be discarded. */
194 static void
_async_socket_io_finished(AsyncSocketIO * asio)195 _async_socket_io_finished(AsyncSocketIO* asio)
196 {
197 /* Notify the client of the I/O that I/O is finished. */
198 asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED);
199 }
200
201 int
async_socket_io_reference(AsyncSocketIO * asio)202 async_socket_io_reference(AsyncSocketIO* asio)
203 {
204 assert(asio->ref_count > 0);
205 asio->ref_count++;
206 return asio->ref_count;
207 }
208
209 int
async_socket_io_release(AsyncSocketIO * asio)210 async_socket_io_release(AsyncSocketIO* asio)
211 {
212 assert(asio->ref_count > 0);
213 asio->ref_count--;
214 if (asio->ref_count == 0) {
215 _async_socket_io_finished(asio);
216 /* Last reference has been dropped. Destroy this object. */
217 _async_socket_io_free(asio);
218 return 0;
219 }
220 return asio->ref_count;
221 }
222
223 /* Creates new asynchronous socket reader.
224 * Param:
225 * as - Asynchronous socket for the reader.
226 * buffer, len - Reader's buffer.
227 * io_cb - Reader's callback.
228 * reader_opaque - An opaque pointer associated with the reader.
229 * deadline - Deadline to complete the operation.
230 * Return:
231 * An initialized AsyncSocketIO intance.
232 */
233 static AsyncSocketIO*
_async_socket_reader_new(AsyncSocket * as,void * buffer,uint32_t len,on_as_io_cb io_cb,void * reader_opaque,Duration deadline)234 _async_socket_reader_new(AsyncSocket* as,
235 void* buffer,
236 uint32_t len,
237 on_as_io_cb io_cb,
238 void* reader_opaque,
239 Duration deadline)
240 {
241 AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
242 reader_opaque, deadline);
243 return asio;
244 }
245
246 /* Creates new asynchronous socket writer.
247 * Param:
248 * as - Asynchronous socket for the writer.
249 * buffer, len - Writer's buffer.
250 * io_cb - Writer's callback.
251 * writer_opaque - An opaque pointer associated with the writer.
252 * deadline - Deadline to complete the operation.
253 * Return:
254 * An initialized AsyncSocketIO intance.
255 */
256 static AsyncSocketIO*
_async_socket_writer_new(AsyncSocket * as,const void * buffer,uint32_t len,on_as_io_cb io_cb,void * writer_opaque,Duration deadline)257 _async_socket_writer_new(AsyncSocket* as,
258 const void* buffer,
259 uint32_t len,
260 on_as_io_cb io_cb,
261 void* writer_opaque,
262 Duration deadline)
263 {
264 AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
265 io_cb, writer_opaque,
266 deadline);
267 return asio;
268 }
269
270 /* I/O timed out. */
271 static void
_on_async_socket_io_timed_out(void * opaque)272 _on_async_socket_io_timed_out(void* opaque)
273 {
274 AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
275 AsyncSocket* const as = asio->as;
276
277 D("ASocket %s: %s I/O with deadline %lld has timed out at %lld",
278 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
279 asio->deadline, async_socket_deadline(as, 0));
280
281 /* Reference while in callback. */
282 async_socket_io_reference(asio);
283 _async_socket_io_timed_out(asio->as, asio);
284 async_socket_io_release(asio);
285 }
286
287 /********************************************************************************
288 * Public Asynchronous Socket I/O API
289 *******************************************************************************/
290
291 AsyncSocket*
async_socket_io_get_socket(const AsyncSocketIO * asio)292 async_socket_io_get_socket(const AsyncSocketIO* asio)
293 {
294 async_socket_reference(asio->as);
295 return asio->as;
296 }
297
298 void
async_socket_io_cancel_time_out(AsyncSocketIO * asio)299 async_socket_io_cancel_time_out(AsyncSocketIO* asio)
300 {
301 loopTimer_stop(asio->timer);
302 }
303
304 void*
async_socket_io_get_io_opaque(const AsyncSocketIO * asio)305 async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
306 {
307 return asio->io_opaque;
308 }
309
310 void*
async_socket_io_get_client_opaque(const AsyncSocketIO * asio)311 async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
312 {
313 return async_socket_get_client_opaque(asio->as);
314 }
315
316 void*
async_socket_io_get_buffer_info(const AsyncSocketIO * asio,uint32_t * transferred,uint32_t * to_transfer)317 async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
318 uint32_t* transferred,
319 uint32_t* to_transfer)
320 {
321 if (transferred != NULL) {
322 *transferred = asio->transferred;
323 }
324 if (to_transfer != NULL) {
325 *to_transfer = asio->to_transfer;
326 }
327 return asio->buffer;
328 }
329
330 void*
async_socket_io_get_buffer(const AsyncSocketIO * asio)331 async_socket_io_get_buffer(const AsyncSocketIO* asio)
332 {
333 return asio->buffer;
334 }
335
336 uint32_t
async_socket_io_get_transferred(const AsyncSocketIO * asio)337 async_socket_io_get_transferred(const AsyncSocketIO* asio)
338 {
339 return asio->transferred;
340 }
341
342 uint32_t
async_socket_io_get_to_transfer(const AsyncSocketIO * asio)343 async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
344 {
345 return asio->to_transfer;
346 }
347
348 int
async_socket_io_is_read(const AsyncSocketIO * asio)349 async_socket_io_is_read(const AsyncSocketIO* asio)
350 {
351 return asio->is_io_read;
352 }
353
354 /********************************************************************************
355 * Asynchronous Socket internals
356 *******************************************************************************/
357
358 struct AsyncSocket {
359 /* TCP address for the socket. */
360 SockAddress address;
361 /* Connection callback for this socket. */
362 on_as_connection_cb on_connection;
363 /* An opaque pointer associated with this socket by the client. */
364 void* client_opaque;
365 /* I/O looper for asynchronous I/O on the socket. */
366 Looper* looper;
367 /* I/O descriptor for asynchronous I/O on the socket. */
368 LoopIo io[1];
369 /* Timer to use for reconnection attempts. */
370 LoopTimer reconnect_timer[1];
371 /* Head of the list of the active readers. */
372 AsyncSocketIO* readers_head;
373 /* Tail of the list of the active readers. */
374 AsyncSocketIO* readers_tail;
375 /* Head of the list of the active writers. */
376 AsyncSocketIO* writers_head;
377 /* Tail of the list of the active writers. */
378 AsyncSocketIO* writers_tail;
379 /* Socket's file descriptor. */
380 int fd;
381 /* Timeout to use for reconnection attempts. */
382 int reconnect_to;
383 /* Number of outstanding references to the socket. */
384 int ref_count;
385 /* Flags whether (1) or not (0) socket owns the looper. */
386 int owns_looper;
387 };
388
389 static const char*
_async_socket_string(AsyncSocket * as)390 _async_socket_string(AsyncSocket* as)
391 {
392 return sock_address_to_string(&as->address);
393 }
394
395 static Looper*
_async_socket_get_looper(AsyncSocket * as)396 _async_socket_get_looper(AsyncSocket* as)
397 {
398 return as->looper;
399 }
400
401 /* Pulls first reader out of the list.
402 * Param:
403 * as - Initialized AsyncSocket instance.
404 * Return:
405 * First I/O pulled out of the list, or NULL if there are no I/O in the list.
406 * Note that the caller is responsible for releasing the I/O object returned
407 * from this routine.
408 */
409 static AsyncSocketIO*
_async_socket_pull_first_io(AsyncSocket * as,AsyncSocketIO ** list_head,AsyncSocketIO ** list_tail)410 _async_socket_pull_first_io(AsyncSocket* as,
411 AsyncSocketIO** list_head,
412 AsyncSocketIO** list_tail)
413 {
414 AsyncSocketIO* const ret = *list_head;
415 if (ret != NULL) {
416 *list_head = ret->next;
417 ret->next = NULL;
418 if (*list_head == NULL) {
419 *list_tail = NULL;
420 }
421 }
422 return ret;
423 }
424
425 /* Pulls first reader out of the list.
426 * Param:
427 * as - Initialized AsyncSocket instance.
428 * Return:
429 * First reader pulled out of the list, or NULL if there are no readers in the
430 * list.
431 * Note that the caller is responsible for releasing the I/O object returned
432 * from this routine.
433 */
434 static AsyncSocketIO*
_async_socket_pull_first_reader(AsyncSocket * as)435 _async_socket_pull_first_reader(AsyncSocket* as)
436 {
437 return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
438 }
439
440 /* Pulls first writer out of the list.
441 * Param:
442 * as - Initialized AsyncSocket instance.
443 * Return:
444 * First writer pulled out of the list, or NULL if there are no writers in the
445 * list.
446 * Note that the caller is responsible for releasing the I/O object returned
447 * from this routine.
448 */
449 static AsyncSocketIO*
_async_socket_pull_first_writer(AsyncSocket * as)450 _async_socket_pull_first_writer(AsyncSocket* as)
451 {
452 return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
453 }
454
455 /* Removes an I/O descriptor from a list of active I/O.
456 * Param:
457 * as - Initialized AsyncSocket instance.
458 * list_head, list_tail - Pointers to the list head and tail.
459 * io - I/O to remove.
460 * Return:
461 * Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
462 */
463 static int
_async_socket_remove_io(AsyncSocket * as,AsyncSocketIO ** list_head,AsyncSocketIO ** list_tail,AsyncSocketIO * io)464 _async_socket_remove_io(AsyncSocket* as,
465 AsyncSocketIO** list_head,
466 AsyncSocketIO** list_tail,
467 AsyncSocketIO* io)
468 {
469 AsyncSocketIO* prev = NULL;
470
471 while (*list_head != NULL && io != *list_head) {
472 prev = *list_head;
473 list_head = &((*list_head)->next);
474 }
475 if (*list_head == NULL) {
476 D("%s: I/O %p is not found in the list for socket '%s'",
477 __FUNCTION__, io, _async_socket_string(as));
478 return 0;
479 }
480
481 *list_head = io->next;
482 if (prev != NULL) {
483 prev->next = io->next;
484 }
485 if (*list_tail == io) {
486 *list_tail = prev;
487 }
488
489 /* Release I/O adjusting reference added when I/O has been saved in the list. */
490 async_socket_io_release(io);
491
492 return 1;
493 }
494
495 /* Advances to the next I/O in the list.
496 * Param:
497 * as - Initialized AsyncSocket instance.
498 * list_head, list_tail - Pointers to the list head and tail.
499 */
500 static void
_async_socket_advance_io(AsyncSocket * as,AsyncSocketIO ** list_head,AsyncSocketIO ** list_tail)501 _async_socket_advance_io(AsyncSocket* as,
502 AsyncSocketIO** list_head,
503 AsyncSocketIO** list_tail)
504 {
505 AsyncSocketIO* first_io = *list_head;
506 if (first_io != NULL) {
507 *list_head = first_io->next;
508 first_io->next = NULL;
509 }
510 if (*list_head == NULL) {
511 *list_tail = NULL;
512 }
513 if (first_io != NULL) {
514 /* Release I/O removed from the head of the list. */
515 async_socket_io_release(first_io);
516 }
517 }
518
519 /* Advances to the next reader in the list.
520 * Param:
521 * as - Initialized AsyncSocket instance.
522 */
523 static void
_async_socket_advance_reader(AsyncSocket * as)524 _async_socket_advance_reader(AsyncSocket* as)
525 {
526 _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
527 }
528
529 /* Advances to the next writer in the list.
530 * Param:
531 * as - Initialized AsyncSocket instance.
532 */
533 static void
_async_socket_advance_writer(AsyncSocket * as)534 _async_socket_advance_writer(AsyncSocket* as)
535 {
536 _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
537 }
538
539 /* Completes an I/O.
540 * Param:
541 * as - Initialized AsyncSocket instance.
542 * asio - I/O to complete.
543 * Return:
544 * One of AsyncIOAction values.
545 */
546 static AsyncIOAction
_async_socket_complete_io(AsyncSocket * as,AsyncSocketIO * asio)547 _async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
548 {
549 T("ASocket %s: %s I/O %p is completed.",
550 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
551
552 /* Stop the timer. */
553 async_socket_io_cancel_time_out(asio);
554
555 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
556 }
557
558 /* Timeouts an I/O.
559 * Param:
560 * as - Initialized AsyncSocket instance.
561 * asio - An I/O that has timed out.
562 * Return:
563 * One of AsyncIOAction values.
564 */
565 static AsyncIOAction
_async_socket_io_timed_out(AsyncSocket * as,AsyncSocketIO * asio)566 _async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
567 {
568 T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld",
569 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
570 asio->deadline, async_socket_deadline(as, 0));
571
572 /* Report to the client. */
573 const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
574 ASIO_STATE_TIMED_OUT);
575
576 /* Remove the I/O from a list of active I/O for actions other than retry. */
577 if (action != ASIO_ACTION_RETRY) {
578 if (asio->is_io_read) {
579 _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
580 } else {
581 _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
582 }
583 }
584
585 return action;
586 }
587
588 /* Cancels an I/O.
589 * Param:
590 * as - Initialized AsyncSocket instance.
591 * asio - An I/O to cancel.
592 * Return:
593 * One of AsyncIOAction values.
594 */
595 static AsyncIOAction
_async_socket_cancel_io(AsyncSocket * as,AsyncSocketIO * asio)596 _async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
597 {
598 T("ASocket %s: %s I/O %p is cancelled.",
599 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
600
601 /* Stop the timer. */
602 async_socket_io_cancel_time_out(asio);
603
604 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
605 }
606
607 /* Reports an I/O failure.
608 * Param:
609 * as - Initialized AsyncSocket instance.
610 * asio - An I/O that has failed. Can be NULL for general failures.
611 * failure - Failure (errno) that has occurred.
612 * Return:
613 * One of AsyncIOAction values.
614 */
615 static AsyncIOAction
_async_socket_io_failure(AsyncSocket * as,AsyncSocketIO * asio,int failure)616 _async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
617 {
618 T("ASocket %s: %s I/O %p has failed: %d -> %s",
619 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
620 failure, strerror(failure));
621
622 /* Stop the timer. */
623 async_socket_io_cancel_time_out(asio);
624
625 errno = failure;
626 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
627 }
628
629 /* Cancels all the active socket readers.
630 * Param:
631 * as - Initialized AsyncSocket instance.
632 */
633 static void
_async_socket_cancel_readers(AsyncSocket * as)634 _async_socket_cancel_readers(AsyncSocket* as)
635 {
636 while (as->readers_head != NULL) {
637 AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
638 /* We ignore action returned from the cancellation callback, since we're
639 * in a disconnected state here. */
640 _async_socket_cancel_io(as, to_cancel);
641 async_socket_io_release(to_cancel);
642 }
643 }
644
645 /* Cancels all the active socket writers.
646 * Param:
647 * as - Initialized AsyncSocket instance.
648 */
649 static void
_async_socket_cancel_writers(AsyncSocket * as)650 _async_socket_cancel_writers(AsyncSocket* as)
651 {
652 while (as->writers_head != NULL) {
653 AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
654 /* We ignore action returned from the cancellation callback, since we're
655 * in a disconnected state here. */
656 _async_socket_cancel_io(as, to_cancel);
657 async_socket_io_release(to_cancel);
658 }
659 }
660
661 /* Cancels all the I/O on the socket. */
662 static void
_async_socket_cancel_all_io(AsyncSocket * as)663 _async_socket_cancel_all_io(AsyncSocket* as)
664 {
665 /* Stop the reconnection timer. */
666 loopTimer_stop(as->reconnect_timer);
667
668 /* Stop read / write on the socket. */
669 loopIo_dontWantWrite(as->io);
670 loopIo_dontWantRead(as->io);
671
672 /* Cancel active readers and writers. */
673 _async_socket_cancel_readers(as);
674 _async_socket_cancel_writers(as);
675 }
676
677 /* Closes socket handle used by the async socket.
678 * Param:
679 * as - Initialized AsyncSocket instance.
680 */
681 static void
_async_socket_close_socket(AsyncSocket * as)682 _async_socket_close_socket(AsyncSocket* as)
683 {
684 if (as->fd >= 0) {
685 T("ASocket %s: Socket handle %d is closed.",
686 _async_socket_string(as), as->fd);
687 loopIo_done(as->io);
688 socket_close(as->fd);
689 as->fd = -1;
690 }
691 }
692
693 /* Destroys AsyncSocket instance.
694 * Param:
695 * as - Initialized AsyncSocket instance.
696 */
697 static void
_async_socket_free(AsyncSocket * as)698 _async_socket_free(AsyncSocket* as)
699 {
700 if (as != NULL) {
701 T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as));
702
703 /* Close socket. */
704 _async_socket_close_socket(as);
705
706 /* Free allocated resources. */
707 if (as->looper != NULL) {
708 loopTimer_done(as->reconnect_timer);
709 if (as->owns_looper) {
710 looper_free(as->looper);
711 }
712 }
713 sock_address_done(&as->address);
714 AFREE(as);
715 }
716 }
717
718 /* Starts reconnection attempts after connection has been lost.
719 * Param:
720 * as - Initialized AsyncSocket instance.
721 * to - Milliseconds to wait before reconnection attempt.
722 */
723 static void
_async_socket_reconnect(AsyncSocket * as,int to)724 _async_socket_reconnect(AsyncSocket* as, int to)
725 {
726 T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to);
727
728 /* Make sure that no I/O is active, and socket is closed before we
729 * reconnect. */
730 _async_socket_cancel_all_io(as);
731
732 /* Set the timer for reconnection attempt. */
733 loopTimer_startRelative(as->reconnect_timer, to);
734 }
735
736 /********************************************************************************
737 * Asynchronous Socket callbacks
738 *******************************************************************************/
739
740 /* A callback that is invoked when socket gets disconnected.
741 * Param:
742 * as - Initialized AsyncSocket instance.
743 */
744 static void
_on_async_socket_disconnected(AsyncSocket * as)745 _on_async_socket_disconnected(AsyncSocket* as)
746 {
747 /* Save error to restore it for the client's callback. */
748 const int save_errno = errno;
749 AsyncIOAction action = ASIO_ACTION_ABORT;
750
751 D("ASocket %s: Disconnected.", _async_socket_string(as));
752
753 /* Cancel all the I/O on this socket. */
754 _async_socket_cancel_all_io(as);
755
756 /* Close the socket. */
757 _async_socket_close_socket(as);
758
759 /* Restore errno, and invoke client's callback. */
760 errno = save_errno;
761 action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
762
763 if (action == ASIO_ACTION_RETRY) {
764 /* Client requested reconnection. */
765 _async_socket_reconnect(as, as->reconnect_to);
766 }
767 }
768
769 /* A callback that is invoked on socket's I/O failure.
770 * Param:
771 * as - Initialized AsyncSocket instance.
772 * asio - Descriptor for the failed I/O. Can be NULL for general failures.
773 */
774 static AsyncIOAction
_on_async_socket_failure(AsyncSocket * as,AsyncSocketIO * asio)775 _on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
776 {
777 D("ASocket %s: %s I/O failure: %d -> %s",
778 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
779 errno, strerror(errno));
780
781 /* Report the failure. */
782 return _async_socket_io_failure(as, asio, errno);
783 }
784
785 /* A callback that is invoked when there is data available to read.
786 * Param:
787 * as - Initialized AsyncSocket instance.
788 * Return:
789 * 0 on success, or -1 on failure. Failure returned from this routine will
790 * skip writes (if awailable) behind this read.
791 */
792 static int
_on_async_socket_recv(AsyncSocket * as)793 _on_async_socket_recv(AsyncSocket* as)
794 {
795 AsyncIOAction action;
796
797 /* Get current reader. */
798 AsyncSocketIO* const asr = as->readers_head;
799 if (asr == NULL) {
800 D("ASocket %s: No reader is available.", _async_socket_string(as));
801 loopIo_dontWantRead(as->io);
802 return 0;
803 }
804
805 /* Reference the reader while we're working with it in this callback. */
806 async_socket_io_reference(asr);
807
808 /* Bump I/O state, and inform the client that I/O is in progress. */
809 if (asr->state == ASIO_STATE_QUEUED) {
810 asr->state = ASIO_STATE_STARTED;
811 } else {
812 asr->state = ASIO_STATE_CONTINUES;
813 }
814 action = asr->on_io(asr->io_opaque, asr, asr->state);
815 if (action == ASIO_ACTION_ABORT) {
816 D("ASocket %s: Read is aborted by the client.", _async_socket_string(as));
817 /* Move on to the next reader. */
818 _async_socket_advance_reader(as);
819 /* Lets see if there are still active readers, and enable, or disable
820 * read I/O callback accordingly. */
821 if (as->readers_head != NULL) {
822 loopIo_wantRead(as->io);
823 } else {
824 loopIo_dontWantRead(as->io);
825 }
826 async_socket_io_release(asr);
827 return 0;
828 }
829
830 /* Read next chunk of data. */
831 int res = HANDLE_EINTR(
832 socket_recv(as->fd,
833 asr->buffer + asr->transferred,
834 asr->to_transfer - asr->transferred));
835 if (res == 0) {
836 /* Socket has been disconnected. */
837 errno = ECONNRESET;
838 _on_async_socket_disconnected(as);
839 async_socket_io_release(asr);
840 return -1;
841 }
842
843 if (res < 0) {
844 if (errno == EWOULDBLOCK || errno == EAGAIN) {
845 /* Yield to writes behind this read. */
846 loopIo_wantRead(as->io);
847 async_socket_io_release(asr);
848 return 0;
849 }
850
851 /* An I/O error. */
852 action = _on_async_socket_failure(as, asr);
853 if (action != ASIO_ACTION_RETRY) {
854 D("ASocket %s: Read is aborted on failure.", _async_socket_string(as));
855 /* Move on to the next reader. */
856 _async_socket_advance_reader(as);
857 /* Lets see if there are still active readers, and enable, or disable
858 * read I/O callback accordingly. */
859 if (as->readers_head != NULL) {
860 loopIo_wantRead(as->io);
861 } else {
862 loopIo_dontWantRead(as->io);
863 }
864 }
865 async_socket_io_release(asr);
866 return -1;
867 }
868
869 /* Update the reader's descriptor. */
870 asr->transferred += res;
871 if (asr->transferred == asr->to_transfer) {
872 /* This read is completed. Move on to the next reader. */
873 _async_socket_advance_reader(as);
874
875 /* Notify reader completion. */
876 _async_socket_complete_io(as, asr);
877 }
878
879 /* Lets see if there are still active readers, and enable, or disable read
880 * I/O callback accordingly. */
881 if (as->readers_head != NULL) {
882 loopIo_wantRead(as->io);
883 } else {
884 loopIo_dontWantRead(as->io);
885 }
886
887 async_socket_io_release(asr);
888
889 return 0;
890 }
891
892 /* A callback that is invoked when there is data available to write.
893 * Param:
894 * as - Initialized AsyncSocket instance.
895 * Return:
896 * 0 on success, or -1 on failure. Failure returned from this routine will
897 * skip reads (if awailable) behind this write.
898 */
899 static int
_on_async_socket_send(AsyncSocket * as)900 _on_async_socket_send(AsyncSocket* as)
901 {
902 AsyncIOAction action;
903
904 /* Get current writer. */
905 AsyncSocketIO* const asw = as->writers_head;
906 if (asw == NULL) {
907 D("ASocket %s: No writer is available.", _async_socket_string(as));
908 loopIo_dontWantWrite(as->io);
909 return 0;
910 }
911
912 /* Reference the writer while we're working with it in this callback. */
913 async_socket_io_reference(asw);
914
915 /* Bump I/O state, and inform the client that I/O is in progress. */
916 if (asw->state == ASIO_STATE_QUEUED) {
917 asw->state = ASIO_STATE_STARTED;
918 } else {
919 asw->state = ASIO_STATE_CONTINUES;
920 }
921 action = asw->on_io(asw->io_opaque, asw, asw->state);
922 if (action == ASIO_ACTION_ABORT) {
923 D("ASocket %s: Write is aborted by the client.", _async_socket_string(as));
924 /* Move on to the next writer. */
925 _async_socket_advance_writer(as);
926 /* Lets see if there are still active writers, and enable, or disable
927 * write I/O callback accordingly. */
928 if (as->writers_head != NULL) {
929 loopIo_wantWrite(as->io);
930 } else {
931 loopIo_dontWantWrite(as->io);
932 }
933 async_socket_io_release(asw);
934 return 0;
935 }
936
937 /* Write next chunk of data. */
938 int res = HANDLE_EINTR(
939 socket_send(as->fd,
940 asw->buffer + asw->transferred,
941 asw->to_transfer - asw->transferred));
942 if (res == 0) {
943 /* Socket has been disconnected. */
944 errno = ECONNRESET;
945 _on_async_socket_disconnected(as);
946 async_socket_io_release(asw);
947 return -1;
948 }
949
950 if (res < 0) {
951 if (errno == EWOULDBLOCK || errno == EAGAIN) {
952 /* Yield to reads behind this write. */
953 loopIo_wantWrite(as->io);
954 async_socket_io_release(asw);
955 return 0;
956 }
957
958 /* An I/O error. */
959 action = _on_async_socket_failure(as, asw);
960 if (action != ASIO_ACTION_RETRY) {
961 D("ASocket %s: Write is aborted on failure.", _async_socket_string(as));
962 /* Move on to the next writer. */
963 _async_socket_advance_writer(as);
964 /* Lets see if there are still active writers, and enable, or disable
965 * write I/O callback accordingly. */
966 if (as->writers_head != NULL) {
967 loopIo_wantWrite(as->io);
968 } else {
969 loopIo_dontWantWrite(as->io);
970 }
971 }
972 async_socket_io_release(asw);
973 return -1;
974 }
975
976 /* Update the writer descriptor. */
977 asw->transferred += res;
978 if (asw->transferred == asw->to_transfer) {
979 /* This write is completed. Move on to the next writer. */
980 _async_socket_advance_writer(as);
981
982 /* Notify writer completion. */
983 _async_socket_complete_io(as, asw);
984 }
985
986 /* Lets see if there are still active writers, and enable, or disable write
987 * I/O callback accordingly. */
988 if (as->writers_head != NULL) {
989 loopIo_wantWrite(as->io);
990 } else {
991 loopIo_dontWantWrite(as->io);
992 }
993
994 async_socket_io_release(asw);
995
996 return 0;
997 }
998
999 /* A callback that is invoked when an I/O is available on socket.
1000 * Param:
1001 * as - Initialized AsyncSocket instance.
1002 * fd - Socket's file descriptor.
1003 * events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
1004 */
1005 static void
_on_async_socket_io(void * opaque,int fd,unsigned events)1006 _on_async_socket_io(void* opaque, int fd, unsigned events)
1007 {
1008 AsyncSocket* const as = (AsyncSocket*)opaque;
1009
1010 /* Reference the socket while we're working with it in this callback. */
1011 async_socket_reference(as);
1012
1013 if ((events & LOOP_IO_READ) != 0) {
1014 if (_on_async_socket_recv(as) != 0) {
1015 async_socket_release(as);
1016 return;
1017 }
1018 }
1019
1020 if ((events & LOOP_IO_WRITE) != 0) {
1021 if (_on_async_socket_send(as) != 0) {
1022 async_socket_release(as);
1023 return;
1024 }
1025 }
1026
1027 async_socket_release(as);
1028 }
1029
1030 /* A callback that is invoked by asynchronous socket connector on connection
1031 * events.
1032 * Param:
1033 * opaque - Initialized AsyncSocket instance.
1034 * connector - Connector that is used to connect this socket.
1035 * event - Connection event.
1036 * Return:
1037 * One of AsyncIOAction values.
1038 */
1039 static AsyncIOAction
_on_connector_events(void * opaque,AsyncSocketConnector * connector,AsyncIOState event)1040 _on_connector_events(void* opaque,
1041 AsyncSocketConnector* connector,
1042 AsyncIOState event)
1043 {
1044 AsyncIOAction action;
1045 AsyncSocket* const as = (AsyncSocket*)opaque;
1046
1047 /* Reference the socket while we're working with it in this callback. */
1048 async_socket_reference(as);
1049
1050 if (event == ASIO_STATE_SUCCEEDED) {
1051 /* Accept the connection. */
1052 as->fd = async_socket_connector_pull_fd(connector);
1053 loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
1054 }
1055
1056 /* Invoke client's callback. */
1057 action = as->on_connection(as->client_opaque, as, event);
1058 if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
1059 /* For whatever reason the client didn't want to keep this connection.
1060 * Close it. */
1061 D("ASocket %s: Connection is discarded by the client.",
1062 _async_socket_string(as));
1063 _async_socket_close_socket(as);
1064 }
1065
1066 if (action != ASIO_ACTION_RETRY) {
1067 async_socket_connector_release(connector);
1068 }
1069
1070 async_socket_release(as);
1071
1072 return action;
1073 }
1074
1075 /* Timer callback invoked to reconnect the lost connection.
1076 * Param:
1077 * as - Initialized AsyncSocket instance.
1078 */
1079 void
_on_async_socket_reconnect(void * opaque)1080 _on_async_socket_reconnect(void* opaque)
1081 {
1082 AsyncSocket* as = (AsyncSocket*)opaque;
1083
1084 /* Reference the socket while we're working with it in this callback. */
1085 async_socket_reference(as);
1086 async_socket_connect(as, as->reconnect_to);
1087 async_socket_release(as);
1088 }
1089
1090
1091 /********************************************************************************
1092 * Android Device Socket public API
1093 *******************************************************************************/
1094
1095 AsyncSocket*
async_socket_new(int port,int reconnect_to,on_as_connection_cb client_cb,void * client_opaque,Looper * looper)1096 async_socket_new(int port,
1097 int reconnect_to,
1098 on_as_connection_cb client_cb,
1099 void* client_opaque,
1100 Looper* looper)
1101 {
1102 AsyncSocket* as;
1103
1104 if (client_cb == NULL) {
1105 E("Invalid client_cb parameter");
1106 return NULL;
1107 }
1108
1109 ANEW0(as);
1110
1111 as->fd = -1;
1112 as->client_opaque = client_opaque;
1113 as->on_connection = client_cb;
1114 as->readers_head = as->readers_tail = NULL;
1115 as->reconnect_to = reconnect_to;
1116 as->ref_count = 1;
1117 sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
1118 if (looper == NULL) {
1119 as->looper = looper_newCore();
1120 if (as->looper == NULL) {
1121 E("Unable to create I/O looper for async socket '%s'",
1122 _async_socket_string(as));
1123 client_cb(client_opaque, as, ASIO_STATE_FAILED);
1124 _async_socket_free(as);
1125 return NULL;
1126 }
1127 as->owns_looper = 1;
1128 } else {
1129 as->looper = looper;
1130 as->owns_looper = 0;
1131 }
1132
1133 loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
1134
1135 T("ASocket %s: Descriptor is created.", _async_socket_string(as));
1136
1137 return as;
1138 }
1139
1140 int
async_socket_reference(AsyncSocket * as)1141 async_socket_reference(AsyncSocket* as)
1142 {
1143 assert(as->ref_count > 0);
1144 as->ref_count++;
1145 return as->ref_count;
1146 }
1147
1148 int
async_socket_release(AsyncSocket * as)1149 async_socket_release(AsyncSocket* as)
1150 {
1151 assert(as->ref_count > 0);
1152 as->ref_count--;
1153 if (as->ref_count == 0) {
1154 /* Last reference has been dropped. Destroy this object. */
1155 _async_socket_cancel_all_io(as);
1156 _async_socket_free(as);
1157 return 0;
1158 }
1159 return as->ref_count;
1160 }
1161
1162 void
async_socket_connect(AsyncSocket * as,int retry_to)1163 async_socket_connect(AsyncSocket* as, int retry_to)
1164 {
1165 T("ASocket %s: Handling connection request for %dms...",
1166 _async_socket_string(as), retry_to);
1167
1168 AsyncSocketConnector* const connector =
1169 async_socket_connector_new(&as->address, retry_to, _on_connector_events,
1170 as, as->looper);
1171 if (connector != NULL) {
1172 async_socket_connector_connect(connector);
1173 } else {
1174 as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
1175 }
1176 }
1177
1178 void
async_socket_disconnect(AsyncSocket * as)1179 async_socket_disconnect(AsyncSocket* as)
1180 {
1181 T("ASocket %s: Handling disconnection request...", _async_socket_string(as));
1182
1183 if (as != NULL) {
1184 _async_socket_cancel_all_io(as);
1185 _async_socket_close_socket(as);
1186 }
1187 }
1188
1189 void
async_socket_reconnect(AsyncSocket * as,int retry_to)1190 async_socket_reconnect(AsyncSocket* as, int retry_to)
1191 {
1192 T("ASocket %s: Handling reconnection request for %dms...",
1193 _async_socket_string(as), retry_to);
1194
1195 _async_socket_cancel_all_io(as);
1196 _async_socket_close_socket(as);
1197 _async_socket_reconnect(as, retry_to);
1198 }
1199
1200 void
async_socket_read_abs(AsyncSocket * as,void * buffer,uint32_t len,on_as_io_cb reader_cb,void * reader_opaque,Duration deadline)1201 async_socket_read_abs(AsyncSocket* as,
1202 void* buffer, uint32_t len,
1203 on_as_io_cb reader_cb,
1204 void* reader_opaque,
1205 Duration deadline)
1206 {
1207 T("ASocket %s: Handling read for %d bytes with deadline %lld...",
1208 _async_socket_string(as), len, deadline);
1209
1210 AsyncSocketIO* const asr =
1211 _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
1212 deadline);
1213 if (async_socket_is_connected(as)) {
1214 /* Add new reader to the list. Note that we use initial reference from I/O
1215 * 'new' routine as "in the list" reference counter. */
1216 if (as->readers_head == NULL) {
1217 as->readers_head = as->readers_tail = asr;
1218 } else {
1219 as->readers_tail->next = asr;
1220 as->readers_tail = asr;
1221 }
1222 loopIo_wantRead(as->io);
1223 } else {
1224 D("ASocket %s: Read on a disconnected socket.", _async_socket_string(as));
1225 errno = ECONNRESET;
1226 reader_cb(reader_opaque, asr, ASIO_STATE_FAILED);
1227 async_socket_io_release(asr);
1228 }
1229 }
1230
1231 void
async_socket_read_rel(AsyncSocket * as,void * buffer,uint32_t len,on_as_io_cb reader_cb,void * reader_opaque,int to)1232 async_socket_read_rel(AsyncSocket* as,
1233 void* buffer, uint32_t len,
1234 on_as_io_cb reader_cb,
1235 void* reader_opaque,
1236 int to)
1237 {
1238 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1239 DURATION_INFINITE;
1240 async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
1241 }
1242
1243 void
async_socket_write_abs(AsyncSocket * as,const void * buffer,uint32_t len,on_as_io_cb writer_cb,void * writer_opaque,Duration deadline)1244 async_socket_write_abs(AsyncSocket* as,
1245 const void* buffer, uint32_t len,
1246 on_as_io_cb writer_cb,
1247 void* writer_opaque,
1248 Duration deadline)
1249 {
1250 T("ASocket %s: Handling write for %d bytes with deadline %lld...",
1251 _async_socket_string(as), len, deadline);
1252
1253 AsyncSocketIO* const asw =
1254 _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
1255 deadline);
1256 if (async_socket_is_connected(as)) {
1257 /* Add new writer to the list. Note that we use initial reference from I/O
1258 * 'new' routine as "in the list" reference counter. */
1259 if (as->writers_head == NULL) {
1260 as->writers_head = as->writers_tail = asw;
1261 } else {
1262 as->writers_tail->next = asw;
1263 as->writers_tail = asw;
1264 }
1265 loopIo_wantWrite(as->io);
1266 } else {
1267 D("ASocket %s: Write on a disconnected socket.", _async_socket_string(as));
1268 errno = ECONNRESET;
1269 writer_cb(writer_opaque, asw, ASIO_STATE_FAILED);
1270 async_socket_io_release(asw);
1271 }
1272 }
1273
1274 void
async_socket_write_rel(AsyncSocket * as,const void * buffer,uint32_t len,on_as_io_cb writer_cb,void * writer_opaque,int to)1275 async_socket_write_rel(AsyncSocket* as,
1276 const void* buffer, uint32_t len,
1277 on_as_io_cb writer_cb,
1278 void* writer_opaque,
1279 int to)
1280 {
1281 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1282 DURATION_INFINITE;
1283 async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
1284 }
1285
1286 void*
async_socket_get_client_opaque(const AsyncSocket * as)1287 async_socket_get_client_opaque(const AsyncSocket* as)
1288 {
1289 return as->client_opaque;
1290 }
1291
1292 Duration
async_socket_deadline(AsyncSocket * as,int rel)1293 async_socket_deadline(AsyncSocket* as, int rel)
1294 {
1295 return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
1296 DURATION_INFINITE;
1297 }
1298
1299 int
async_socket_get_port(const AsyncSocket * as)1300 async_socket_get_port(const AsyncSocket* as)
1301 {
1302 return sock_address_get_port(&as->address);
1303 }
1304
1305 int
async_socket_is_connected(const AsyncSocket * as)1306 async_socket_is_connected(const AsyncSocket* as)
1307 {
1308 return as->fd >= 0;
1309 }
1310