• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2007 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "mq"
18 
19 #include <assert.h>
20 #include <errno.h>
21 #include <fcntl.h>
22 #include <pthread.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <unistd.h>
26 
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <sys/un.h>
30 #include <sys/uio.h>
31 
32 #include <cutils/array.h>
33 #include <cutils/hashmap.h>
34 #include <cutils/selector.h>
35 
36 #include "loghack.h"
37 #include "buffer.h"
38 
39 /** Number of dead peers to remember. */
40 #define PEER_HISTORY (16)
41 
42 typedef struct sockaddr SocketAddress;
43 typedef struct sockaddr_un UnixAddress;
44 
45 /**
46  * Process/user/group ID. We don't use ucred directly because it's only
47  * available on Linux.
48  */
49 typedef struct {
50     pid_t pid;
51     uid_t uid;
52     gid_t gid;
53 } Credentials;
54 
55 /** Listens for bytes coming from remote peers. */
56 typedef void BytesListener(Credentials credentials, char* bytes, size_t size);
57 
58 /** Listens for the deaths of remote peers. */
59 typedef void DeathListener(pid_t pid);
60 
61 /** Types of packets. */
62 typedef enum {
63     /** Request for a connection to another peer. */
64     CONNECTION_REQUEST,
65 
66     /** A connection to another peer. */
67     CONNECTION,
68 
69     /** Reports a failed connection attempt. */
70     CONNECTION_ERROR,
71 
72     /** A generic packet of bytes. */
73     BYTES,
74 } PacketType;
75 
76 typedef enum {
77     /** Reading a packet header. */
78     READING_HEADER,
79 
80     /** Waiting for a connection from the master. */
81     ACCEPTING_CONNECTION,
82 
83     /** Reading bytes. */
84     READING_BYTES,
85 } InputState;
86 
87 /** A packet header. */
88 // TODO: Use custom headers for master->peer, peer->master, peer->peer.
89 typedef struct {
90     PacketType type;
91     union {
92         /** Packet size. Used for BYTES. */
93         size_t size;
94 
95         /** Credentials. Used for CONNECTION and CONNECTION_REQUEST. */
96         Credentials credentials;
97     };
98 } Header;
99 
100 /** A packet which will be sent to a peer. */
101 typedef struct OutgoingPacket OutgoingPacket;
102 struct OutgoingPacket {
103     /** Packet header. */
104     Header header;
105 
106     union {
107         /** Connection to peer. Used with CONNECTION. */
108         int socket;
109 
110         /** Buffer of bytes. Used with BYTES. */
111         Buffer* bytes;
112     };
113 
114     /** Frees all resources associated with this packet. */
115     void (*free)(OutgoingPacket* packet);
116 
117     /** Optional context. */
118     void* context;
119 
120     /** Next packet in the queue. */
121     OutgoingPacket* nextPacket;
122 };
123 
124 /** Represents a remote peer. */
125 typedef struct PeerProxy PeerProxy;
126 
127 /** Local peer state. You typically have one peer per process. */
128 typedef struct {
129     /** This peer's PID. */
130     pid_t pid;
131 
132     /**
133      * Map from pid to peer proxy. The peer has a peer proxy for each remote
134      * peer it's connected to.
135      *
136      * Acquire mutex before use.
137      */
138     Hashmap* peerProxies;
139 
140     /** Manages I/O. */
141     Selector* selector;
142 
143     /** Used to synchronize operations with the selector thread. */
144     pthread_mutex_t mutex;
145 
146     /** Is this peer the master? */
147     bool master;
148 
149     /** Peer proxy for the master. */
150     PeerProxy* masterProxy;
151 
152     /** Listens for packets from remote peers. */
153     BytesListener* onBytes;
154 
155     /** Listens for deaths of remote peers. */
156     DeathListener* onDeath;
157 
158     /** Keeps track of recently dead peers. Requires mutex. */
159     pid_t deadPeers[PEER_HISTORY];
160     size_t deadPeerCursor;
161 } Peer;
162 
163 struct PeerProxy {
164     /** Credentials of the remote process. */
165     Credentials credentials;
166 
167     /** Keeps track of data coming in from the remote peer. */
168     InputState inputState;
169     Buffer* inputBuffer;
170     PeerProxy* connecting;
171 
172     /** File descriptor for this peer. */
173     SelectableFd* fd;
174 
175     /**
176      * Queue of packets to be written out to the remote peer.
177      *
178      * Requires mutex.
179      */
180     // TODO: Limit queue length.
181     OutgoingPacket* currentPacket;
182     OutgoingPacket* lastPacket;
183 
184     /** Used to write outgoing header. */
185     Buffer outgoingHeader;
186 
187     /** True if this is the master's proxy. */
188     bool master;
189 
190     /** Reference back to the local peer. */
191     Peer* peer;
192 
193     /**
194      * Used in master only. Maps this peer proxy to other peer proxies to
195      * which the peer has been connected to. Maps pid to PeerProxy. Helps
196      * keep track of which connections we've sent to whom.
197      */
198     Hashmap* connections;
199 };
200 
201 /** Server socket path. */
202 static const char* MASTER_PATH = "/master.peer";
203 
204 /** Credentials of the master peer. */
205 static const Credentials MASTER_CREDENTIALS = {0, 0, 0};
206 
207 /** Creates a peer proxy and adds it to the peer proxy map. */
208 static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials);
209 
210 /** Sets the non-blocking flag on a descriptor. */
setNonBlocking(int fd)211 static void setNonBlocking(int fd) {
212     int flags;
213     if ((flags = fcntl(fd, F_GETFL, 0)) < 0) {
214         LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
215     }
216     if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
217         LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
218     }
219 }
220 
221 /** Closes a fd and logs a warning if the close fails. */
closeWithWarning(int fd)222 static void closeWithWarning(int fd) {
223     int result = close(fd);
224     if (result == -1) {
225         LOGW("close() error: %s", strerror(errno));
226     }
227 }
228 
229 /** Hashes pid_t keys. */
pidHash(void * key)230 static int pidHash(void* key) {
231     pid_t* pid = (pid_t*) key;
232     return (int) (*pid);
233 }
234 
235 /** Compares pid_t keys. */
pidEquals(void * keyA,void * keyB)236 static bool pidEquals(void* keyA, void* keyB) {
237     pid_t* a = (pid_t*) keyA;
238     pid_t* b = (pid_t*) keyB;
239     return *a == *b;
240 }
241 
242 /** Gets the master address. Not thread safe. */
getMasterAddress()243 static UnixAddress* getMasterAddress() {
244     static UnixAddress masterAddress;
245     static bool initialized = false;
246     if (initialized == false) {
247         masterAddress.sun_family = AF_LOCAL;
248         strcpy(masterAddress.sun_path, MASTER_PATH);
249         initialized = true;
250     }
251     return &masterAddress;
252 }
253 
254 /** Gets exclusive access to the peer for this thread. */
peerLock(Peer * peer)255 static void peerLock(Peer* peer) {
256     pthread_mutex_lock(&peer->mutex);
257 }
258 
259 /** Releases exclusive access to the peer. */
peerUnlock(Peer * peer)260 static void peerUnlock(Peer* peer) {
261     pthread_mutex_unlock(&peer->mutex);
262 }
263 
264 /** Frees a simple, i.e. header-only, outgoing packet. */
outgoingPacketFree(OutgoingPacket * packet)265 static void outgoingPacketFree(OutgoingPacket* packet) {
266     LOGD("Freeing outgoing packet.");
267 	free(packet);
268 }
269 
270 /**
271  * Prepare to read a new packet from the peer.
272  */
peerProxyExpectHeader(PeerProxy * peerProxy)273 static void peerProxyExpectHeader(PeerProxy* peerProxy) {
274     peerProxy->inputState = READING_HEADER;
275     bufferPrepareForRead(peerProxy->inputBuffer, sizeof(Header));
276 }
277 
278 /** Sets up the buffer for the outgoing header. */
peerProxyPrepareOutgoingHeader(PeerProxy * peerProxy)279 static void peerProxyPrepareOutgoingHeader(PeerProxy* peerProxy) {
280     peerProxy->outgoingHeader.data
281         = (char*) &(peerProxy->currentPacket->header);
282     peerProxy->outgoingHeader.size = sizeof(Header);
283     bufferPrepareForWrite(&peerProxy->outgoingHeader);
284 }
285 
286 /** Adds a packet to the end of the queue. Callers must have the mutex. */
peerProxyEnqueueOutgoingPacket(PeerProxy * peerProxy,OutgoingPacket * newPacket)287 static void peerProxyEnqueueOutgoingPacket(PeerProxy* peerProxy,
288         OutgoingPacket* newPacket) {
289     newPacket->nextPacket = NULL; // Just in case.
290     if (peerProxy->currentPacket == NULL) {
291         // The queue is empty.
292         peerProxy->currentPacket = newPacket;
293         peerProxy->lastPacket = newPacket;
294 
295         peerProxyPrepareOutgoingHeader(peerProxy);
296     } else {
297         peerProxy->lastPacket->nextPacket = newPacket;
298     }
299 }
300 
301 /** Takes the peer lock and enqueues the given packet. */
peerProxyLockAndEnqueueOutgoingPacket(PeerProxy * peerProxy,OutgoingPacket * newPacket)302 static void peerProxyLockAndEnqueueOutgoingPacket(PeerProxy* peerProxy,
303         OutgoingPacket* newPacket) {
304     Peer* peer = peerProxy->peer;
305     peerLock(peer);
306     peerProxyEnqueueOutgoingPacket(peerProxy, newPacket);
307     peerUnlock(peer);
308 }
309 
310 /**
311  * Frees current packet and moves to the next one. Returns true if there is
312  * a next packet or false if the queue is empty.
313  */
peerProxyNextPacket(PeerProxy * peerProxy)314 static bool peerProxyNextPacket(PeerProxy* peerProxy) {
315     Peer* peer = peerProxy->peer;
316     peerLock(peer);
317 
318     OutgoingPacket* current = peerProxy->currentPacket;
319 
320     if (current == NULL) {
321     	// The queue is already empty.
322         peerUnlock(peer);
323         return false;
324     }
325 
326     OutgoingPacket* next = current->nextPacket;
327     peerProxy->currentPacket = next;
328     current->nextPacket = NULL;
329     current->free(current);
330     if (next == NULL) {
331         // The queue is empty.
332         peerProxy->lastPacket = NULL;
333         peerUnlock(peer);
334         return false;
335     } else {
336         peerUnlock(peer);
337         peerProxyPrepareOutgoingHeader(peerProxy);
338 
339         // TODO: Start writing next packet? It would reduce the number of
340         // system calls, but we could also starve other peers.
341         return true;
342     }
343 }
344 
345 /**
346  * Checks whether a peer died recently.
347  */
peerIsDead(Peer * peer,pid_t pid)348 static bool peerIsDead(Peer* peer, pid_t pid) {
349     size_t i;
350     for (i = 0; i < PEER_HISTORY; i++) {
351         pid_t deadPeer = peer->deadPeers[i];
352         if (deadPeer == 0) {
353             return false;
354         }
355         if (deadPeer == pid) {
356             return true;
357         }
358     }
359     return false;
360 }
361 
362 /**
363  * Cleans up connection information.
364  */
peerProxyRemoveConnection(void * key,void * value,void * context)365 static bool peerProxyRemoveConnection(void* key, void* value, void* context) {
366     PeerProxy* deadPeer = (PeerProxy*) context;
367     PeerProxy* otherPeer = (PeerProxy*) value;
368     hashmapRemove(otherPeer->connections, &(deadPeer->credentials.pid));
369     return true;
370 }
371 
372 /**
373  * Called when the peer dies.
374  */
peerProxyKill(PeerProxy * peerProxy,bool errnoIsSet)375 static void peerProxyKill(PeerProxy* peerProxy, bool errnoIsSet) {
376     if (errnoIsSet) {
377         LOGI("Peer %d died. errno: %s", peerProxy->credentials.pid,
378                 strerror(errno));
379     } else {
380         LOGI("Peer %d died.", peerProxy->credentials.pid);
381     }
382 
383     // If we lost the master, we're up a creek. We can't let this happen.
384     if (peerProxy->master) {
385         LOG_ALWAYS_FATAL("Lost connection to master.");
386     }
387 
388     Peer* localPeer = peerProxy->peer;
389     pid_t pid = peerProxy->credentials.pid;
390 
391     peerLock(localPeer);
392 
393     // Remember for awhile that the peer died.
394     localPeer->deadPeers[localPeer->deadPeerCursor]
395         = peerProxy->credentials.pid;
396     localPeer->deadPeerCursor++;
397     if (localPeer->deadPeerCursor == PEER_HISTORY) {
398         localPeer->deadPeerCursor = 0;
399     }
400 
401     // Remove from peer map.
402     hashmapRemove(localPeer->peerProxies, &pid);
403 
404     // External threads can no longer get to this peer proxy, so we don't
405     // need the lock anymore.
406     peerUnlock(localPeer);
407 
408     // Remove the fd from the selector.
409     if (peerProxy->fd != NULL) {
410         peerProxy->fd->remove = true;
411     }
412 
413     // Clear outgoing packet queue.
414     while (peerProxyNextPacket(peerProxy)) {}
415 
416     bufferFree(peerProxy->inputBuffer);
417 
418     // This only applies to the master.
419     if (peerProxy->connections != NULL) {
420         // We can't leave these other maps pointing to freed memory.
421         hashmapForEach(peerProxy->connections, &peerProxyRemoveConnection,
422                 peerProxy);
423         hashmapFree(peerProxy->connections);
424     }
425 
426     // Invoke death listener.
427     localPeer->onDeath(pid);
428 
429     // Free the peer proxy itself.
430     free(peerProxy);
431 }
432 
peerProxyHandleError(PeerProxy * peerProxy,char * functionName)433 static void peerProxyHandleError(PeerProxy* peerProxy, char* functionName) {
434     if (errno == EINTR) {
435         // Log interruptions but otherwise ignore them.
436         LOGW("%s() interrupted.", functionName);
437     } else if (errno == EAGAIN) {
438     	LOGD("EWOULDBLOCK");
439         // Ignore.
440     } else {
441         LOGW("Error returned by %s().", functionName);
442         peerProxyKill(peerProxy, true);
443     }
444 }
445 
446 /**
447  * Buffers output sent to a peer. May be called multiple times until the entire
448  * buffer is filled. Returns true when the buffer is empty.
449  */
peerProxyWriteFromBuffer(PeerProxy * peerProxy,Buffer * outgoing)450 static bool peerProxyWriteFromBuffer(PeerProxy* peerProxy, Buffer* outgoing) {
451     ssize_t size = bufferWrite(outgoing, peerProxy->fd->fd);
452     if (size < 0) {
453         peerProxyHandleError(peerProxy, "write");
454         return false;
455     } else {
456         return bufferWriteComplete(outgoing);
457     }
458 }
459 
460 /** Writes packet bytes to peer. */
peerProxyWriteBytes(PeerProxy * peerProxy)461 static void peerProxyWriteBytes(PeerProxy* peerProxy) {
462 	Buffer* buffer = peerProxy->currentPacket->bytes;
463 	if (peerProxyWriteFromBuffer(peerProxy, buffer)) {
464         LOGD("Bytes written.");
465         peerProxyNextPacket(peerProxy);
466     }
467 }
468 
469 /** Sends a socket to the peer. */
peerProxyWriteConnection(PeerProxy * peerProxy)470 static void peerProxyWriteConnection(PeerProxy* peerProxy) {
471     int socket = peerProxy->currentPacket->socket;
472 
473     // Why does sending and receiving fds have to be such a PITA?
474     struct msghdr msg;
475     struct iovec iov[1];
476 
477     union {
478         struct cmsghdr cm;
479         char control[CMSG_SPACE(sizeof(int))];
480     } control_un;
481 
482     struct cmsghdr *cmptr;
483 
484     msg.msg_control = control_un.control;
485     msg.msg_controllen = sizeof(control_un.control);
486     cmptr = CMSG_FIRSTHDR(&msg);
487     cmptr->cmsg_len = CMSG_LEN(sizeof(int));
488     cmptr->cmsg_level = SOL_SOCKET;
489     cmptr->cmsg_type = SCM_RIGHTS;
490 
491     // Store the socket in the message.
492     *((int *) CMSG_DATA(cmptr)) = peerProxy->currentPacket->socket;
493 
494     msg.msg_name = NULL;
495     msg.msg_namelen = 0;
496     iov[0].iov_base = "";
497     iov[0].iov_len = 1;
498     msg.msg_iov = iov;
499     msg.msg_iovlen = 1;
500 
501     ssize_t result = sendmsg(peerProxy->fd->fd, &msg, 0);
502 
503     if (result < 0) {
504         peerProxyHandleError(peerProxy, "sendmsg");
505     } else {
506         // Success. Queue up the next packet.
507         peerProxyNextPacket(peerProxy);
508 
509     }
510 }
511 
512 /**
513  * Writes some outgoing data.
514  */
peerProxyWrite(SelectableFd * fd)515 static void peerProxyWrite(SelectableFd* fd) {
516     // TODO: Try to write header and body with one system call.
517 
518     PeerProxy* peerProxy = (PeerProxy*) fd->data;
519     OutgoingPacket* current = peerProxy->currentPacket;
520 
521     if (current == NULL) {
522         // We have nothing left to write.
523         return;
524     }
525 
526     // Write the header.
527     Buffer* outgoingHeader = &peerProxy->outgoingHeader;
528     bool headerWritten = bufferWriteComplete(outgoingHeader);
529     if (!headerWritten) {
530         LOGD("Writing header...");
531         headerWritten = peerProxyWriteFromBuffer(peerProxy, outgoingHeader);
532         if (headerWritten) {
533             LOGD("Header written.");
534         }
535     }
536 
537     // Write body.
538     if (headerWritten) {
539         PacketType type = current->header.type;
540         switch (type) {
541             case CONNECTION:
542                 peerProxyWriteConnection(peerProxy);
543                 break;
544             case BYTES:
545                 peerProxyWriteBytes(peerProxy);
546                 break;
547             case CONNECTION_REQUEST:
548             case CONNECTION_ERROR:
549                 // These packets consist solely of a header.
550                 peerProxyNextPacket(peerProxy);
551                 break;
552             default:
553                 LOG_ALWAYS_FATAL("Unknown packet type: %d", type);
554         }
555     }
556 }
557 
558 /**
559  * Sets up a peer proxy's fd before we try to select() it.
560  */
peerProxyBeforeSelect(SelectableFd * fd)561 static void peerProxyBeforeSelect(SelectableFd* fd) {
562     LOGD("Before select...");
563 
564     PeerProxy* peerProxy = (PeerProxy*) fd->data;
565 
566     peerLock(peerProxy->peer);
567     bool hasPackets = peerProxy->currentPacket != NULL;
568     peerUnlock(peerProxy->peer);
569 
570     if (hasPackets) {
571         LOGD("Packets found. Setting onWritable().");
572 
573         fd->onWritable = &peerProxyWrite;
574     } else {
575         // We have nothing to write.
576         fd->onWritable = NULL;
577     }
578 }
579 
580 /** Prepare to read bytes from the peer. */
peerProxyExpectBytes(PeerProxy * peerProxy,Header * header)581 static void peerProxyExpectBytes(PeerProxy* peerProxy, Header* header) {
582 	LOGD("Expecting %d bytes.", header->size);
583 
584 	peerProxy->inputState = READING_BYTES;
585     if (bufferPrepareForRead(peerProxy->inputBuffer, header->size) == -1) {
586         LOGW("Couldn't allocate memory for incoming data. Size: %u",
587                 (unsigned int) header->size);
588 
589         // TODO: Ignore the packet and log a warning?
590         peerProxyKill(peerProxy, false);
591     }
592 }
593 
594 /**
595  * Gets a peer proxy for the given ID. Creates a peer proxy if necessary.
596  * Sends a connection request to the master if desired.
597  *
598  * Returns NULL if an error occurs. Sets errno to EHOSTDOWN if the peer died
599  * or ENOMEM if memory couldn't be allocated.
600  */
peerProxyGetOrCreate(Peer * peer,pid_t pid,bool requestConnection)601 static PeerProxy* peerProxyGetOrCreate(Peer* peer, pid_t pid,
602         bool requestConnection) {
603     if (pid == peer->pid) {
604         errno = EINVAL;
605         return NULL;
606     }
607 
608     if (peerIsDead(peer, pid)) {
609         errno = EHOSTDOWN;
610         return NULL;
611     }
612 
613     PeerProxy* peerProxy = hashmapGet(peer->peerProxies, &pid);
614     if (peerProxy != NULL) {
615         return peerProxy;
616     }
617 
618     // If this is the master peer, we already know about all peers.
619     if (peer->master) {
620         errno = EHOSTDOWN;
621         return NULL;
622     }
623 
624     // Try to create a peer proxy.
625     Credentials credentials;
626     credentials.pid = pid;
627 
628     // Fake gid and uid until we have the real thing. The real creds are
629     // filled in by masterProxyExpectConnection(). These fake creds will
630     // never be exposed to the user.
631     credentials.uid = 0;
632     credentials.gid = 0;
633 
634     // Make sure we can allocate the connection request packet.
635     OutgoingPacket* packet = NULL;
636     if (requestConnection) {
637         packet = calloc(1, sizeof(OutgoingPacket));
638         if (packet == NULL) {
639             errno = ENOMEM;
640             return NULL;
641         }
642 
643         packet->header.type = CONNECTION_REQUEST;
644         packet->header.credentials = credentials;
645         packet->free = &outgoingPacketFree;
646     }
647 
648     peerProxy = peerProxyCreate(peer, credentials);
649     if (peerProxy == NULL) {
650         free(packet);
651         errno = ENOMEM;
652         return NULL;
653     } else {
654         // Send a connection request to the master.
655         if (requestConnection) {
656             PeerProxy* masterProxy = peer->masterProxy;
657             peerProxyEnqueueOutgoingPacket(masterProxy, packet);
658         }
659 
660         return peerProxy;
661     }
662 }
663 
664 /**
665  * Switches the master peer proxy into a state where it's waiting for a
666  * connection from the master.
667  */
masterProxyExpectConnection(PeerProxy * masterProxy,Header * header)668 static void masterProxyExpectConnection(PeerProxy* masterProxy,
669         Header* header) {
670     // TODO: Restructure things so we don't need this check.
671     // Verify that this really is the master.
672     if (!masterProxy->master) {
673         LOGW("Non-master process %d tried to send us a connection.",
674             masterProxy->credentials.pid);
675         // Kill off the evil peer.
676         peerProxyKill(masterProxy, false);
677         return;
678     }
679 
680     masterProxy->inputState = ACCEPTING_CONNECTION;
681     Peer* localPeer = masterProxy->peer;
682 
683     // Create a peer proxy so we have somewhere to stash the creds.
684     // See if we already have a proxy set up.
685     pid_t pid = header->credentials.pid;
686     peerLock(localPeer);
687     PeerProxy* peerProxy = peerProxyGetOrCreate(localPeer, pid, false);
688     if (peerProxy == NULL) {
689         LOGW("Peer proxy creation failed: %s", strerror(errno));
690     } else {
691         // Fill in full credentials.
692         peerProxy->credentials = header->credentials;
693     }
694     peerUnlock(localPeer);
695 
696     // Keep track of which peer proxy we're accepting a connection for.
697     masterProxy->connecting = peerProxy;
698 }
699 
700 /**
701  * Reads input from a peer process.
702  */
703 static void peerProxyRead(SelectableFd* fd);
704 
705 /** Sets up fd callbacks. */
peerProxySetFd(PeerProxy * peerProxy,SelectableFd * fd)706 static void peerProxySetFd(PeerProxy* peerProxy, SelectableFd* fd) {
707     peerProxy->fd = fd;
708     fd->data = peerProxy;
709     fd->onReadable = &peerProxyRead;
710     fd->beforeSelect = &peerProxyBeforeSelect;
711 
712     // Make the socket non-blocking.
713     setNonBlocking(fd->fd);
714 }
715 
716 /**
717  * Accepts a connection sent by the master proxy.
718  */
masterProxyAcceptConnection(PeerProxy * masterProxy)719 static void masterProxyAcceptConnection(PeerProxy* masterProxy) {
720     struct msghdr msg;
721     struct iovec iov[1];
722     ssize_t size;
723     char ignored;
724     int incomingFd;
725 
726     // TODO: Reuse code which writes the connection. Who the heck designed
727     // this API anyway?
728     union {
729         struct cmsghdr cm;
730         char control[CMSG_SPACE(sizeof(int))];
731     } control_un;
732     struct cmsghdr *cmptr;
733     msg.msg_control = control_un.control;
734     msg.msg_controllen = sizeof(control_un.control);
735 
736     msg.msg_name = NULL;
737     msg.msg_namelen = 0;
738 
739     // We sent 1 byte of data so we can detect EOF.
740     iov[0].iov_base = &ignored;
741     iov[0].iov_len = 1;
742     msg.msg_iov = iov;
743     msg.msg_iovlen = 1;
744 
745     size = recvmsg(masterProxy->fd->fd, &msg, 0);
746     if (size < 0) {
747         if (errno == EINTR) {
748             // Log interruptions but otherwise ignore them.
749             LOGW("recvmsg() interrupted.");
750             return;
751         } else if (errno == EAGAIN) {
752             // Keep waiting for the connection.
753             return;
754         } else {
755             LOG_ALWAYS_FATAL("Error reading connection from master: %s",
756                     strerror(errno));
757         }
758     } else if (size == 0) {
759         // EOF.
760         LOG_ALWAYS_FATAL("Received EOF from master.");
761     }
762 
763     // Extract fd from message.
764     if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL
765             && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
766         if (cmptr->cmsg_level != SOL_SOCKET) {
767             LOG_ALWAYS_FATAL("Expected SOL_SOCKET.");
768         }
769         if (cmptr->cmsg_type != SCM_RIGHTS) {
770             LOG_ALWAYS_FATAL("Expected SCM_RIGHTS.");
771         }
772         incomingFd = *((int*) CMSG_DATA(cmptr));
773     } else {
774         LOG_ALWAYS_FATAL("Expected fd.");
775     }
776 
777     // The peer proxy this connection is for.
778     PeerProxy* peerProxy = masterProxy->connecting;
779     if (peerProxy == NULL) {
780         LOGW("Received connection for unknown peer.");
781         closeWithWarning(incomingFd);
782     } else {
783         Peer* peer = masterProxy->peer;
784 
785         SelectableFd* selectableFd = selectorAdd(peer->selector, incomingFd);
786         if (selectableFd == NULL) {
787             LOGW("Error adding fd to selector for %d.",
788                     peerProxy->credentials.pid);
789             closeWithWarning(incomingFd);
790             peerProxyKill(peerProxy, false);
791         }
792 
793         peerProxySetFd(peerProxy, selectableFd);
794     }
795 
796     peerProxyExpectHeader(masterProxy);
797 }
798 
799 /**
800  * Frees an outgoing packet containing a connection.
801  */
outgoingPacketFreeSocket(OutgoingPacket * packet)802 static void outgoingPacketFreeSocket(OutgoingPacket* packet) {
803     closeWithWarning(packet->socket);
804     outgoingPacketFree(packet);
805 }
806 
807 /**
808  * Connects two known peers.
809  */
masterConnectPeers(PeerProxy * peerA,PeerProxy * peerB)810 static void masterConnectPeers(PeerProxy* peerA, PeerProxy* peerB) {
811     int sockets[2];
812     int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets);
813     if (result == -1) {
814         LOGW("socketpair() error: %s", strerror(errno));
815         // TODO: Send CONNECTION_FAILED packets to peers.
816         return;
817     }
818 
819     OutgoingPacket* packetA = calloc(1, sizeof(OutgoingPacket));
820     OutgoingPacket* packetB = calloc(1, sizeof(OutgoingPacket));
821     if (packetA == NULL || packetB == NULL) {
822         free(packetA);
823         free(packetB);
824         LOGW("malloc() error. Failed to tell process %d that process %d is"
825                 " dead.", peerA->credentials.pid, peerB->credentials.pid);
826         return;
827     }
828 
829     packetA->header.type = CONNECTION;
830     packetB->header.type = CONNECTION;
831 
832     packetA->header.credentials = peerB->credentials;
833     packetB->header.credentials = peerA->credentials;
834 
835     packetA->socket = sockets[0];
836     packetB->socket = sockets[1];
837 
838     packetA->free = &outgoingPacketFreeSocket;
839     packetB->free = &outgoingPacketFreeSocket;
840 
841     peerLock(peerA->peer);
842     peerProxyEnqueueOutgoingPacket(peerA, packetA);
843     peerProxyEnqueueOutgoingPacket(peerB, packetB);
844     peerUnlock(peerA->peer);
845 }
846 
847 /**
848  * Informs a peer that the peer they're trying to connect to couldn't be
849  * found.
850  */
masterReportConnectionError(PeerProxy * peerProxy,Credentials credentials)851 static void masterReportConnectionError(PeerProxy* peerProxy,
852         Credentials credentials) {
853     OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
854     if (packet == NULL) {
855         LOGW("malloc() error. Failed to tell process %d that process %d is"
856                 " dead.", peerProxy->credentials.pid, credentials.pid);
857         return;
858     }
859 
860     packet->header.type = CONNECTION_ERROR;
861     packet->header.credentials = credentials;
862     packet->free = &outgoingPacketFree;
863 
864     peerProxyLockAndEnqueueOutgoingPacket(peerProxy, packet);
865 }
866 
867 /**
868  * Handles a request to be connected to another peer.
869  */
masterHandleConnectionRequest(PeerProxy * peerProxy,Header * header)870 static void masterHandleConnectionRequest(PeerProxy* peerProxy,
871         Header* header) {
872     Peer* master = peerProxy->peer;
873     pid_t targetPid = header->credentials.pid;
874     if (!hashmapContainsKey(peerProxy->connections, &targetPid)) {
875         // We haven't connected these peers yet.
876         PeerProxy* targetPeer
877             = (PeerProxy*) hashmapGet(master->peerProxies, &targetPid);
878         if (targetPeer == NULL) {
879             // Unknown process.
880             masterReportConnectionError(peerProxy, header->credentials);
881         } else {
882             masterConnectPeers(peerProxy, targetPeer);
883         }
884     }
885 
886     // This packet is complete. Get ready for the next one.
887     peerProxyExpectHeader(peerProxy);
888 }
889 
890 /**
891  * The master told us this peer is dead.
892  */
masterProxyHandleConnectionError(PeerProxy * masterProxy,Header * header)893 static void masterProxyHandleConnectionError(PeerProxy* masterProxy,
894         Header* header) {
895     Peer* peer = masterProxy->peer;
896 
897     // Look up the peer proxy.
898     pid_t pid = header->credentials.pid;
899     PeerProxy* peerProxy = NULL;
900     peerLock(peer);
901     peerProxy = hashmapGet(peer->peerProxies, &pid);
902     peerUnlock(peer);
903 
904     if (peerProxy != NULL) {
905         LOGI("Couldn't connect to %d.", pid);
906         peerProxyKill(peerProxy, false);
907     } else {
908         LOGW("Peer proxy for %d not found. This shouldn't happen.", pid);
909     }
910 
911     peerProxyExpectHeader(masterProxy);
912 }
913 
914 /**
915  * Handles a packet header.
916  */
peerProxyHandleHeader(PeerProxy * peerProxy,Header * header)917 static void peerProxyHandleHeader(PeerProxy* peerProxy, Header* header) {
918     switch (header->type) {
919         case CONNECTION_REQUEST:
920             masterHandleConnectionRequest(peerProxy, header);
921             break;
922         case CONNECTION:
923             masterProxyExpectConnection(peerProxy, header);
924             break;
925         case CONNECTION_ERROR:
926             masterProxyHandleConnectionError(peerProxy, header);
927             break;
928         case BYTES:
929             peerProxyExpectBytes(peerProxy, header);
930             break;
931         default:
932             LOGW("Invalid packet type from %d: %d", peerProxy->credentials.pid,
933                     header->type);
934             peerProxyKill(peerProxy, false);
935     }
936 }
937 
938 /**
939  * Buffers input sent by peer. May be called multiple times until the entire
940  * buffer is filled. Returns true when the buffer is full.
941  */
peerProxyBufferInput(PeerProxy * peerProxy)942 static bool peerProxyBufferInput(PeerProxy* peerProxy) {
943     Buffer* in = peerProxy->inputBuffer;
944     ssize_t size = bufferRead(in, peerProxy->fd->fd);
945     if (size < 0) {
946         peerProxyHandleError(peerProxy, "read");
947         return false;
948     } else if (size == 0) {
949         // EOF.
950     	LOGI("EOF");
951         peerProxyKill(peerProxy, false);
952         return false;
953     } else if (bufferReadComplete(in)) {
954         // We're done!
955         return true;
956     } else {
957         // Continue reading.
958         return false;
959     }
960 }
961 
962 /**
963  * Reads input from a peer process.
964  */
peerProxyRead(SelectableFd * fd)965 static void peerProxyRead(SelectableFd* fd) {
966     LOGD("Reading...");
967     PeerProxy* peerProxy = (PeerProxy*) fd->data;
968     int state = peerProxy->inputState;
969     Buffer* in = peerProxy->inputBuffer;
970     switch (state) {
971         case READING_HEADER:
972             if (peerProxyBufferInput(peerProxy)) {
973                 LOGD("Header read.");
974                 // We've read the complete header.
975                 Header* header = (Header*) in->data;
976                 peerProxyHandleHeader(peerProxy, header);
977             }
978             break;
979         case READING_BYTES:
980             LOGD("Reading bytes...");
981             if (peerProxyBufferInput(peerProxy)) {
982                 LOGD("Bytes read.");
983                 // We have the complete packet. Notify bytes listener.
984                 peerProxy->peer->onBytes(peerProxy->credentials,
985                     in->data, in->size);
986 
987                 // Get ready for the next packet.
988                 peerProxyExpectHeader(peerProxy);
989             }
990             break;
991         case ACCEPTING_CONNECTION:
992             masterProxyAcceptConnection(peerProxy);
993             break;
994         default:
995             LOG_ALWAYS_FATAL("Unknown state: %d", state);
996     }
997 }
998 
peerProxyCreate(Peer * peer,Credentials credentials)999 static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials) {
1000     PeerProxy* peerProxy = calloc(1, sizeof(PeerProxy));
1001     if (peerProxy == NULL) {
1002         return NULL;
1003     }
1004 
1005     peerProxy->inputBuffer = bufferCreate(sizeof(Header));
1006     if (peerProxy->inputBuffer == NULL) {
1007         free(peerProxy);
1008         return NULL;
1009     }
1010 
1011     peerProxy->peer = peer;
1012     peerProxy->credentials = credentials;
1013 
1014     // Initial state == expecting a header.
1015     peerProxyExpectHeader(peerProxy);
1016 
1017     // Add this proxy to the map. Make sure the key points to the stable memory
1018     // inside of the peer proxy itself.
1019     pid_t* pid = &(peerProxy->credentials.pid);
1020     hashmapPut(peer->peerProxies, pid, peerProxy);
1021     return peerProxy;
1022 }
1023 
1024 /** Accepts a connection to the master peer. */
masterAcceptConnection(SelectableFd * listenerFd)1025 static void masterAcceptConnection(SelectableFd* listenerFd) {
1026     // Accept connection.
1027     int socket = accept(listenerFd->fd, NULL, NULL);
1028     if (socket == -1) {
1029         LOGW("accept() error: %s", strerror(errno));
1030         return;
1031     }
1032 
1033     LOGD("Accepted connection as fd %d.", socket);
1034 
1035     // Get credentials.
1036     Credentials credentials;
1037     struct ucred ucredentials;
1038     socklen_t credentialsSize = sizeof(struct ucred);
1039     int result = getsockopt(socket, SOL_SOCKET, SO_PEERCRED,
1040                 &ucredentials, &credentialsSize);
1041     // We might want to verify credentialsSize.
1042     if (result == -1) {
1043         LOGW("getsockopt() error: %s", strerror(errno));
1044         closeWithWarning(socket);
1045         return;
1046     }
1047 
1048     // Copy values into our own structure so we know we have the types right.
1049     credentials.pid = ucredentials.pid;
1050     credentials.uid = ucredentials.uid;
1051     credentials.gid = ucredentials.gid;
1052 
1053     LOGI("Accepted connection from process %d.", credentials.pid);
1054 
1055     Peer* masterPeer = (Peer*) listenerFd->data;
1056 
1057     peerLock(masterPeer);
1058 
1059     // Make sure we don't already have a connection from that process.
1060     PeerProxy* peerProxy
1061         = hashmapGet(masterPeer->peerProxies, &credentials.pid);
1062     if (peerProxy != NULL) {
1063         peerUnlock(masterPeer);
1064         LOGW("Alread connected to process %d.", credentials.pid);
1065         closeWithWarning(socket);
1066         return;
1067     }
1068 
1069     // Add connection to the selector.
1070     SelectableFd* socketFd = selectorAdd(masterPeer->selector, socket);
1071     if (socketFd == NULL) {
1072         peerUnlock(masterPeer);
1073         LOGW("malloc() failed.");
1074         closeWithWarning(socket);
1075         return;
1076     }
1077 
1078     // Create a peer proxy.
1079     peerProxy = peerProxyCreate(masterPeer, credentials);
1080     peerUnlock(masterPeer);
1081     if (peerProxy == NULL) {
1082         LOGW("malloc() failed.");
1083         socketFd->remove = true;
1084         closeWithWarning(socket);
1085     }
1086     peerProxy->connections = hashmapCreate(10, &pidHash, &pidEquals);
1087     peerProxySetFd(peerProxy, socketFd);
1088 }
1089 
1090 /**
1091  * Creates the local peer.
1092  */
peerCreate()1093 static Peer* peerCreate() {
1094     Peer* peer = calloc(1, sizeof(Peer));
1095     if (peer == NULL) {
1096         LOG_ALWAYS_FATAL("malloc() error.");
1097     }
1098     peer->peerProxies = hashmapCreate(10, &pidHash, &pidEquals);
1099     peer->selector = selectorCreate();
1100 
1101     pthread_mutexattr_t attributes;
1102     if (pthread_mutexattr_init(&attributes) != 0) {
1103         LOG_ALWAYS_FATAL("pthread_mutexattr_init() error.");
1104     }
1105     if (pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_RECURSIVE) != 0) {
1106         LOG_ALWAYS_FATAL("pthread_mutexattr_settype() error.");
1107     }
1108     if (pthread_mutex_init(&peer->mutex, &attributes) != 0) {
1109         LOG_ALWAYS_FATAL("pthread_mutex_init() error.");
1110     }
1111 
1112     peer->pid = getpid();
1113     return peer;
1114 }
1115 
1116 /** The local peer. */
1117 static Peer* localPeer;
1118 
1119 /** Frees a packet of bytes. */
outgoingPacketFreeBytes(OutgoingPacket * packet)1120 static void outgoingPacketFreeBytes(OutgoingPacket* packet) {
1121     LOGD("Freeing outgoing packet.");
1122     bufferFree(packet->bytes);
1123     free(packet);
1124 }
1125 
1126 /**
1127  * Sends a packet of bytes to a remote peer. Returns 0 on success.
1128  *
1129  * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
1130  * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
1131  * to EINVAL if pid is the same as the local pid.
1132  */
peerSendBytes(pid_t pid,const char * bytes,size_t size)1133 int peerSendBytes(pid_t pid, const char* bytes, size_t size) {
1134 	Peer* peer = localPeer;
1135     assert(peer != NULL);
1136 
1137     OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
1138     if (packet == NULL) {
1139         errno = ENOMEM;
1140         return -1;
1141     }
1142 
1143     Buffer* copy = bufferCreate(size);
1144     if (copy == NULL) {
1145         free(packet);
1146         errno = ENOMEM;
1147         return -1;
1148     }
1149 
1150     // Copy data.
1151     memcpy(copy->data, bytes, size);
1152     copy->size = size;
1153 
1154     packet->bytes = copy;
1155     packet->header.type = BYTES;
1156     packet->header.size = size;
1157     packet->free = outgoingPacketFreeBytes;
1158     bufferPrepareForWrite(packet->bytes);
1159 
1160     peerLock(peer);
1161 
1162     PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
1163     if (peerProxy == NULL) {
1164         // The peer is already dead or we couldn't alloc memory. Either way,
1165         // errno is set.
1166         peerUnlock(peer);
1167         packet->free(packet);
1168         return -1;
1169     } else {
1170         peerProxyEnqueueOutgoingPacket(peerProxy, packet);
1171         peerUnlock(peer);
1172         selectorWakeUp(peer->selector);
1173         return 0;
1174     }
1175 }
1176 
1177 /** Keeps track of how to free shared bytes. */
1178 typedef struct {
1179     void (*free)(void* context);
1180     void* context;
1181 } SharedBytesFreer;
1182 
1183 /** Frees shared bytes. */
outgoingPacketFreeSharedBytes(OutgoingPacket * packet)1184 static void outgoingPacketFreeSharedBytes(OutgoingPacket* packet) {
1185     SharedBytesFreer* sharedBytesFreer
1186         = (SharedBytesFreer*) packet->context;
1187     sharedBytesFreer->free(sharedBytesFreer->context);
1188     free(sharedBytesFreer);
1189     free(packet);
1190 }
1191 
1192 /**
1193  * Sends a packet of bytes to a remote peer without copying the bytes. Calls
1194  * free() with context after the bytes have been sent.
1195  *
1196  * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
1197  * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
1198  * to EINVAL if pid is the same as the local pid.
1199  */
peerSendSharedBytes(pid_t pid,char * bytes,size_t size,void (* free)(void * context),void * context)1200 int peerSendSharedBytes(pid_t pid, char* bytes, size_t size,
1201         void (*free)(void* context), void* context) {
1202     Peer* peer = localPeer;
1203     assert(peer != NULL);
1204 
1205     OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
1206     if (packet == NULL) {
1207         errno = ENOMEM;
1208         return -1;
1209     }
1210 
1211     Buffer* wrapper = bufferWrap(bytes, size, size);
1212     if (wrapper == NULL) {
1213         free(packet);
1214         errno = ENOMEM;
1215         return -1;
1216     }
1217 
1218     SharedBytesFreer* sharedBytesFreer = malloc(sizeof(SharedBytesFreer));
1219     if (sharedBytesFreer == NULL) {
1220         free(packet);
1221         free(wrapper);
1222         errno = ENOMEM;
1223         return -1;
1224     }
1225     sharedBytesFreer->free = free;
1226     sharedBytesFreer->context = context;
1227 
1228     packet->bytes = wrapper;
1229     packet->context = sharedBytesFreer;
1230     packet->header.type = BYTES;
1231     packet->header.size = size;
1232     packet->free = &outgoingPacketFreeSharedBytes;
1233     bufferPrepareForWrite(packet->bytes);
1234 
1235     peerLock(peer);
1236 
1237     PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
1238     if (peerProxy == NULL) {
1239         // The peer is already dead or we couldn't alloc memory. Either way,
1240         // errno is set.
1241         peerUnlock(peer);
1242         packet->free(packet);
1243         return -1;
1244     } else {
1245         peerProxyEnqueueOutgoingPacket(peerProxy, packet);
1246         peerUnlock(peer);
1247         selectorWakeUp(peer->selector);
1248         return 0;
1249     }
1250 }
1251 
1252 /**
1253  * Starts the master peer. The master peer differs from other peers in that
1254  * it is responsible for connecting the other peers. You can only have one
1255  * master peer.
1256  *
1257  * Goes into an I/O loop and does not return.
1258  */
masterPeerInitialize(BytesListener * bytesListener,DeathListener * deathListener)1259 void masterPeerInitialize(BytesListener* bytesListener,
1260         DeathListener* deathListener) {
1261     // Create and bind socket.
1262     int listenerSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
1263     if (listenerSocket == -1) {
1264         LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
1265     }
1266     unlink(MASTER_PATH);
1267     int result = bind(listenerSocket, (SocketAddress*) getMasterAddress(),
1268             sizeof(UnixAddress));
1269     if (result == -1) {
1270         LOG_ALWAYS_FATAL("bind() error: %s", strerror(errno));
1271     }
1272 
1273     LOGD("Listener socket: %d",  listenerSocket);
1274 
1275     // Queue up to 16 connections.
1276     result = listen(listenerSocket, 16);
1277     if (result != 0) {
1278         LOG_ALWAYS_FATAL("listen() error: %s", strerror(errno));
1279     }
1280 
1281     // Make socket non-blocking.
1282     setNonBlocking(listenerSocket);
1283 
1284     // Create the peer for this process. Fail if we already have one.
1285     if (localPeer != NULL) {
1286         LOG_ALWAYS_FATAL("Peer is already initialized.");
1287     }
1288     localPeer = peerCreate();
1289     if (localPeer == NULL) {
1290         LOG_ALWAYS_FATAL("malloc() failed.");
1291     }
1292     localPeer->master = true;
1293     localPeer->onBytes = bytesListener;
1294     localPeer->onDeath = deathListener;
1295 
1296     // Make listener socket selectable.
1297     SelectableFd* listenerFd = selectorAdd(localPeer->selector, listenerSocket);
1298     if (listenerFd == NULL) {
1299         LOG_ALWAYS_FATAL("malloc() error.");
1300     }
1301     listenerFd->data = localPeer;
1302     listenerFd->onReadable = &masterAcceptConnection;
1303 }
1304 
1305 /**
1306  * Starts a local peer.
1307  *
1308  * Goes into an I/O loop and does not return.
1309  */
peerInitialize(BytesListener * bytesListener,DeathListener * deathListener)1310 void peerInitialize(BytesListener* bytesListener,
1311         DeathListener* deathListener) {
1312     // Connect to master peer.
1313     int masterSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
1314     if (masterSocket == -1) {
1315         LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
1316     }
1317     int result = connect(masterSocket, (SocketAddress*) getMasterAddress(),
1318             sizeof(UnixAddress));
1319     if (result != 0) {
1320         LOG_ALWAYS_FATAL("connect() error: %s", strerror(errno));
1321     }
1322 
1323     // Create the peer for this process. Fail if we already have one.
1324     if (localPeer != NULL) {
1325         LOG_ALWAYS_FATAL("Peer is already initialized.");
1326     }
1327     localPeer = peerCreate();
1328     if (localPeer == NULL) {
1329         LOG_ALWAYS_FATAL("malloc() failed.");
1330     }
1331     localPeer->onBytes = bytesListener;
1332     localPeer->onDeath = deathListener;
1333 
1334     // Make connection selectable.
1335     SelectableFd* masterFd = selectorAdd(localPeer->selector, masterSocket);
1336     if (masterFd == NULL) {
1337         LOG_ALWAYS_FATAL("malloc() error.");
1338     }
1339 
1340     // Create a peer proxy for the master peer.
1341     PeerProxy* masterProxy = peerProxyCreate(localPeer, MASTER_CREDENTIALS);
1342     if (masterProxy == NULL) {
1343         LOG_ALWAYS_FATAL("malloc() error.");
1344     }
1345     peerProxySetFd(masterProxy, masterFd);
1346     masterProxy->master = true;
1347     localPeer->masterProxy = masterProxy;
1348 }
1349 
1350 /** Starts the master peer I/O loop. Doesn't return. */
peerLoop()1351 void peerLoop() {
1352     assert(localPeer != NULL);
1353 
1354     // Start selector.
1355     selectorLoop(localPeer->selector);
1356 }
1357 
1358