• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1  /*
2   * Copyright (C) 2007 The Android Open Source Project
3   *
4   * Licensed under the Apache License, Version 2.0 (the "License");
5   * you may not use this file except in compliance with the License.
6   * You may obtain a copy of the License at
7   *
8   *      http://www.apache.org/licenses/LICENSE-2.0
9   *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  #define LOG_TAG "mq"
18  
19  #include <assert.h>
20  #include <errno.h>
21  #include <fcntl.h>
22  #include <pthread.h>
23  #include <stdlib.h>
24  #include <string.h>
25  #include <unistd.h>
26  
27  #include <sys/socket.h>
28  #include <sys/types.h>
29  #include <sys/un.h>
30  #include <sys/uio.h>
31  
32  #include <cutils/array.h>
33  #include <cutils/hashmap.h>
34  #include <cutils/selector.h>
35  
36  #include "loghack.h"
37  #include "buffer.h"
38  
39  /** Number of dead peers to remember. */
40  #define PEER_HISTORY (16)
41  
42  typedef struct sockaddr SocketAddress;
43  typedef struct sockaddr_un UnixAddress;
44  
45  /**
46   * Process/user/group ID. We don't use ucred directly because it's only
47   * available on Linux.
48   */
49  typedef struct {
50      pid_t pid;
51      uid_t uid;
52      gid_t gid;
53  } Credentials;
54  
55  /** Listens for bytes coming from remote peers. */
56  typedef void BytesListener(Credentials credentials, char* bytes, size_t size);
57  
58  /** Listens for the deaths of remote peers. */
59  typedef void DeathListener(pid_t pid);
60  
61  /** Types of packets. */
62  typedef enum {
63      /** Request for a connection to another peer. */
64      CONNECTION_REQUEST,
65  
66      /** A connection to another peer. */
67      CONNECTION,
68  
69      /** Reports a failed connection attempt. */
70      CONNECTION_ERROR,
71  
72      /** A generic packet of bytes. */
73      BYTES,
74  } PacketType;
75  
76  typedef enum {
77      /** Reading a packet header. */
78      READING_HEADER,
79  
80      /** Waiting for a connection from the master. */
81      ACCEPTING_CONNECTION,
82  
83      /** Reading bytes. */
84      READING_BYTES,
85  } InputState;
86  
87  /** A packet header. */
88  // TODO: Use custom headers for master->peer, peer->master, peer->peer.
89  typedef struct {
90      PacketType type;
91      union {
92          /** Packet size. Used for BYTES. */
93          size_t size;
94  
95          /** Credentials. Used for CONNECTION and CONNECTION_REQUEST. */
96          Credentials credentials;
97      };
98  } Header;
99  
100  /** A packet which will be sent to a peer. */
101  typedef struct OutgoingPacket OutgoingPacket;
102  struct OutgoingPacket {
103      /** Packet header. */
104      Header header;
105  
106      union {
107          /** Connection to peer. Used with CONNECTION. */
108          int socket;
109  
110          /** Buffer of bytes. Used with BYTES. */
111          Buffer* bytes;
112      };
113  
114      /** Frees all resources associated with this packet. */
115      void (*free)(OutgoingPacket* packet);
116  
117      /** Optional context. */
118      void* context;
119  
120      /** Next packet in the queue. */
121      OutgoingPacket* nextPacket;
122  };
123  
124  /** Represents a remote peer. */
125  typedef struct PeerProxy PeerProxy;
126  
127  /** Local peer state. You typically have one peer per process. */
128  typedef struct {
129      /** This peer's PID. */
130      pid_t pid;
131  
132      /**
133       * Map from pid to peer proxy. The peer has a peer proxy for each remote
134       * peer it's connected to.
135       *
136       * Acquire mutex before use.
137       */
138      Hashmap* peerProxies;
139  
140      /** Manages I/O. */
141      Selector* selector;
142  
143      /** Used to synchronize operations with the selector thread. */
144      pthread_mutex_t mutex;
145  
146      /** Is this peer the master? */
147      bool master;
148  
149      /** Peer proxy for the master. */
150      PeerProxy* masterProxy;
151  
152      /** Listens for packets from remote peers. */
153      BytesListener* onBytes;
154  
155      /** Listens for deaths of remote peers. */
156      DeathListener* onDeath;
157  
158      /** Keeps track of recently dead peers. Requires mutex. */
159      pid_t deadPeers[PEER_HISTORY];
160      size_t deadPeerCursor;
161  } Peer;
162  
163  struct PeerProxy {
164      /** Credentials of the remote process. */
165      Credentials credentials;
166  
167      /** Keeps track of data coming in from the remote peer. */
168      InputState inputState;
169      Buffer* inputBuffer;
170      PeerProxy* connecting;
171  
172      /** File descriptor for this peer. */
173      SelectableFd* fd;
174  
175      /**
176       * Queue of packets to be written out to the remote peer.
177       *
178       * Requires mutex.
179       */
180      // TODO: Limit queue length.
181      OutgoingPacket* currentPacket;
182      OutgoingPacket* lastPacket;
183  
184      /** Used to write outgoing header. */
185      Buffer outgoingHeader;
186  
187      /** True if this is the master's proxy. */
188      bool master;
189  
190      /** Reference back to the local peer. */
191      Peer* peer;
192  
193      /**
194       * Used in master only. Maps this peer proxy to other peer proxies to
195       * which the peer has been connected to. Maps pid to PeerProxy. Helps
196       * keep track of which connections we've sent to whom.
197       */
198      Hashmap* connections;
199  };
200  
201  /** Server socket path. */
202  static const char* MASTER_PATH = "/master.peer";
203  
204  /** Credentials of the master peer. */
205  static const Credentials MASTER_CREDENTIALS = {0, 0, 0};
206  
207  /** Creates a peer proxy and adds it to the peer proxy map. */
208  static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials);
209  
210  /** Sets the non-blocking flag on a descriptor. */
setNonBlocking(int fd)211  static void setNonBlocking(int fd) {
212      int flags;
213      if ((flags = fcntl(fd, F_GETFL, 0)) < 0) {
214          LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
215      }
216      if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
217          LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
218      }
219  }
220  
221  /** Closes a fd and logs a warning if the close fails. */
closeWithWarning(int fd)222  static void closeWithWarning(int fd) {
223      int result = close(fd);
224      if (result == -1) {
225          ALOGW("close() error: %s", strerror(errno));
226      }
227  }
228  
229  /** Hashes pid_t keys. */
pidHash(void * key)230  static int pidHash(void* key) {
231      pid_t* pid = (pid_t*) key;
232      return (int) (*pid);
233  }
234  
235  /** Compares pid_t keys. */
pidEquals(void * keyA,void * keyB)236  static bool pidEquals(void* keyA, void* keyB) {
237      pid_t* a = (pid_t*) keyA;
238      pid_t* b = (pid_t*) keyB;
239      return *a == *b;
240  }
241  
242  /** Gets the master address. Not thread safe. */
getMasterAddress()243  static UnixAddress* getMasterAddress() {
244      static UnixAddress masterAddress;
245      static bool initialized = false;
246      if (initialized == false) {
247          masterAddress.sun_family = AF_LOCAL;
248          strcpy(masterAddress.sun_path, MASTER_PATH);
249          initialized = true;
250      }
251      return &masterAddress;
252  }
253  
254  /** Gets exclusive access to the peer for this thread. */
peerLock(Peer * peer)255  static void peerLock(Peer* peer) {
256      pthread_mutex_lock(&peer->mutex);
257  }
258  
259  /** Releases exclusive access to the peer. */
peerUnlock(Peer * peer)260  static void peerUnlock(Peer* peer) {
261      pthread_mutex_unlock(&peer->mutex);
262  }
263  
264  /** Frees a simple, i.e. header-only, outgoing packet. */
outgoingPacketFree(OutgoingPacket * packet)265  static void outgoingPacketFree(OutgoingPacket* packet) {
266      ALOGD("Freeing outgoing packet.");
267  	free(packet);
268  }
269  
270  /**
271   * Prepare to read a new packet from the peer.
272   */
peerProxyExpectHeader(PeerProxy * peerProxy)273  static void peerProxyExpectHeader(PeerProxy* peerProxy) {
274      peerProxy->inputState = READING_HEADER;
275      bufferPrepareForRead(peerProxy->inputBuffer, sizeof(Header));
276  }
277  
278  /** Sets up the buffer for the outgoing header. */
peerProxyPrepareOutgoingHeader(PeerProxy * peerProxy)279  static void peerProxyPrepareOutgoingHeader(PeerProxy* peerProxy) {
280      peerProxy->outgoingHeader.data
281          = (char*) &(peerProxy->currentPacket->header);
282      peerProxy->outgoingHeader.size = sizeof(Header);
283      bufferPrepareForWrite(&peerProxy->outgoingHeader);
284  }
285  
286  /** Adds a packet to the end of the queue. Callers must have the mutex. */
peerProxyEnqueueOutgoingPacket(PeerProxy * peerProxy,OutgoingPacket * newPacket)287  static void peerProxyEnqueueOutgoingPacket(PeerProxy* peerProxy,
288          OutgoingPacket* newPacket) {
289      newPacket->nextPacket = NULL; // Just in case.
290      if (peerProxy->currentPacket == NULL) {
291          // The queue is empty.
292          peerProxy->currentPacket = newPacket;
293          peerProxy->lastPacket = newPacket;
294  
295          peerProxyPrepareOutgoingHeader(peerProxy);
296      } else {
297          peerProxy->lastPacket->nextPacket = newPacket;
298      }
299  }
300  
301  /** Takes the peer lock and enqueues the given packet. */
peerProxyLockAndEnqueueOutgoingPacket(PeerProxy * peerProxy,OutgoingPacket * newPacket)302  static void peerProxyLockAndEnqueueOutgoingPacket(PeerProxy* peerProxy,
303          OutgoingPacket* newPacket) {
304      Peer* peer = peerProxy->peer;
305      peerLock(peer);
306      peerProxyEnqueueOutgoingPacket(peerProxy, newPacket);
307      peerUnlock(peer);
308  }
309  
310  /**
311   * Frees current packet and moves to the next one. Returns true if there is
312   * a next packet or false if the queue is empty.
313   */
peerProxyNextPacket(PeerProxy * peerProxy)314  static bool peerProxyNextPacket(PeerProxy* peerProxy) {
315      Peer* peer = peerProxy->peer;
316      peerLock(peer);
317  
318      OutgoingPacket* current = peerProxy->currentPacket;
319  
320      if (current == NULL) {
321      	// The queue is already empty.
322          peerUnlock(peer);
323          return false;
324      }
325  
326      OutgoingPacket* next = current->nextPacket;
327      peerProxy->currentPacket = next;
328      current->nextPacket = NULL;
329      current->free(current);
330      if (next == NULL) {
331          // The queue is empty.
332          peerProxy->lastPacket = NULL;
333          peerUnlock(peer);
334          return false;
335      } else {
336          peerUnlock(peer);
337          peerProxyPrepareOutgoingHeader(peerProxy);
338  
339          // TODO: Start writing next packet? It would reduce the number of
340          // system calls, but we could also starve other peers.
341          return true;
342      }
343  }
344  
345  /**
346   * Checks whether a peer died recently.
347   */
peerIsDead(Peer * peer,pid_t pid)348  static bool peerIsDead(Peer* peer, pid_t pid) {
349      size_t i;
350      for (i = 0; i < PEER_HISTORY; i++) {
351          pid_t deadPeer = peer->deadPeers[i];
352          if (deadPeer == 0) {
353              return false;
354          }
355          if (deadPeer == pid) {
356              return true;
357          }
358      }
359      return false;
360  }
361  
362  /**
363   * Cleans up connection information.
364   */
peerProxyRemoveConnection(void * key,void * value,void * context)365  static bool peerProxyRemoveConnection(void* key, void* value, void* context) {
366      PeerProxy* deadPeer = (PeerProxy*) context;
367      PeerProxy* otherPeer = (PeerProxy*) value;
368      hashmapRemove(otherPeer->connections, &(deadPeer->credentials.pid));
369      return true;
370  }
371  
372  /**
373   * Called when the peer dies.
374   */
peerProxyKill(PeerProxy * peerProxy,bool errnoIsSet)375  static void peerProxyKill(PeerProxy* peerProxy, bool errnoIsSet) {
376      if (errnoIsSet) {
377          ALOGI("Peer %d died. errno: %s", peerProxy->credentials.pid,
378                  strerror(errno));
379      } else {
380          ALOGI("Peer %d died.", peerProxy->credentials.pid);
381      }
382  
383      // If we lost the master, we're up a creek. We can't let this happen.
384      if (peerProxy->master) {
385          LOG_ALWAYS_FATAL("Lost connection to master.");
386      }
387  
388      Peer* localPeer = peerProxy->peer;
389      pid_t pid = peerProxy->credentials.pid;
390  
391      peerLock(localPeer);
392  
393      // Remember for awhile that the peer died.
394      localPeer->deadPeers[localPeer->deadPeerCursor]
395          = peerProxy->credentials.pid;
396      localPeer->deadPeerCursor++;
397      if (localPeer->deadPeerCursor == PEER_HISTORY) {
398          localPeer->deadPeerCursor = 0;
399      }
400  
401      // Remove from peer map.
402      hashmapRemove(localPeer->peerProxies, &pid);
403  
404      // External threads can no longer get to this peer proxy, so we don't
405      // need the lock anymore.
406      peerUnlock(localPeer);
407  
408      // Remove the fd from the selector.
409      if (peerProxy->fd != NULL) {
410          peerProxy->fd->remove = true;
411      }
412  
413      // Clear outgoing packet queue.
414      while (peerProxyNextPacket(peerProxy)) {}
415  
416      bufferFree(peerProxy->inputBuffer);
417  
418      // This only applies to the master.
419      if (peerProxy->connections != NULL) {
420          // We can't leave these other maps pointing to freed memory.
421          hashmapForEach(peerProxy->connections, &peerProxyRemoveConnection,
422                  peerProxy);
423          hashmapFree(peerProxy->connections);
424      }
425  
426      // Invoke death listener.
427      localPeer->onDeath(pid);
428  
429      // Free the peer proxy itself.
430      free(peerProxy);
431  }
432  
peerProxyHandleError(PeerProxy * peerProxy,char * functionName)433  static void peerProxyHandleError(PeerProxy* peerProxy, char* functionName) {
434      if (errno == EINTR) {
435          // Log interruptions but otherwise ignore them.
436          ALOGW("%s() interrupted.", functionName);
437      } else if (errno == EAGAIN) {
438          ALOGD("EWOULDBLOCK");
439          // Ignore.
440      } else {
441          ALOGW("Error returned by %s().", functionName);
442          peerProxyKill(peerProxy, true);
443      }
444  }
445  
446  /**
447   * Buffers output sent to a peer. May be called multiple times until the entire
448   * buffer is filled. Returns true when the buffer is empty.
449   */
peerProxyWriteFromBuffer(PeerProxy * peerProxy,Buffer * outgoing)450  static bool peerProxyWriteFromBuffer(PeerProxy* peerProxy, Buffer* outgoing) {
451      ssize_t size = bufferWrite(outgoing, peerProxy->fd->fd);
452      if (size < 0) {
453          peerProxyHandleError(peerProxy, "write");
454          return false;
455      } else {
456          return bufferWriteComplete(outgoing);
457      }
458  }
459  
460  /** Writes packet bytes to peer. */
peerProxyWriteBytes(PeerProxy * peerProxy)461  static void peerProxyWriteBytes(PeerProxy* peerProxy) {
462  	Buffer* buffer = peerProxy->currentPacket->bytes;
463  	if (peerProxyWriteFromBuffer(peerProxy, buffer)) {
464          ALOGD("Bytes written.");
465          peerProxyNextPacket(peerProxy);
466      }
467  }
468  
469  /** Sends a socket to the peer. */
peerProxyWriteConnection(PeerProxy * peerProxy)470  static void peerProxyWriteConnection(PeerProxy* peerProxy) {
471      int socket = peerProxy->currentPacket->socket;
472  
473      // Why does sending and receiving fds have to be such a PITA?
474      struct msghdr msg;
475      struct iovec iov[1];
476  
477      union {
478          struct cmsghdr cm;
479          char control[CMSG_SPACE(sizeof(int))];
480      } control_un;
481  
482      struct cmsghdr *cmptr;
483  
484      msg.msg_control = control_un.control;
485      msg.msg_controllen = sizeof(control_un.control);
486      cmptr = CMSG_FIRSTHDR(&msg);
487      cmptr->cmsg_len = CMSG_LEN(sizeof(int));
488      cmptr->cmsg_level = SOL_SOCKET;
489      cmptr->cmsg_type = SCM_RIGHTS;
490  
491      // Store the socket in the message.
492      *((int *) CMSG_DATA(cmptr)) = peerProxy->currentPacket->socket;
493  
494      msg.msg_name = NULL;
495      msg.msg_namelen = 0;
496      iov[0].iov_base = "";
497      iov[0].iov_len = 1;
498      msg.msg_iov = iov;
499      msg.msg_iovlen = 1;
500  
501      ssize_t result = sendmsg(peerProxy->fd->fd, &msg, 0);
502  
503      if (result < 0) {
504          peerProxyHandleError(peerProxy, "sendmsg");
505      } else {
506          // Success. Queue up the next packet.
507          peerProxyNextPacket(peerProxy);
508  
509      }
510  }
511  
512  /**
513   * Writes some outgoing data.
514   */
peerProxyWrite(SelectableFd * fd)515  static void peerProxyWrite(SelectableFd* fd) {
516      // TODO: Try to write header and body with one system call.
517  
518      PeerProxy* peerProxy = (PeerProxy*) fd->data;
519      OutgoingPacket* current = peerProxy->currentPacket;
520  
521      if (current == NULL) {
522          // We have nothing left to write.
523          return;
524      }
525  
526      // Write the header.
527      Buffer* outgoingHeader = &peerProxy->outgoingHeader;
528      bool headerWritten = bufferWriteComplete(outgoingHeader);
529      if (!headerWritten) {
530          ALOGD("Writing header...");
531          headerWritten = peerProxyWriteFromBuffer(peerProxy, outgoingHeader);
532          if (headerWritten) {
533              ALOGD("Header written.");
534          }
535      }
536  
537      // Write body.
538      if (headerWritten) {
539          PacketType type = current->header.type;
540          switch (type) {
541              case CONNECTION:
542                  peerProxyWriteConnection(peerProxy);
543                  break;
544              case BYTES:
545                  peerProxyWriteBytes(peerProxy);
546                  break;
547              case CONNECTION_REQUEST:
548              case CONNECTION_ERROR:
549                  // These packets consist solely of a header.
550                  peerProxyNextPacket(peerProxy);
551                  break;
552              default:
553                  LOG_ALWAYS_FATAL("Unknown packet type: %d", type);
554          }
555      }
556  }
557  
558  /**
559   * Sets up a peer proxy's fd before we try to select() it.
560   */
peerProxyBeforeSelect(SelectableFd * fd)561  static void peerProxyBeforeSelect(SelectableFd* fd) {
562      ALOGD("Before select...");
563  
564      PeerProxy* peerProxy = (PeerProxy*) fd->data;
565  
566      peerLock(peerProxy->peer);
567      bool hasPackets = peerProxy->currentPacket != NULL;
568      peerUnlock(peerProxy->peer);
569  
570      if (hasPackets) {
571          ALOGD("Packets found. Setting onWritable().");
572  
573          fd->onWritable = &peerProxyWrite;
574      } else {
575          // We have nothing to write.
576          fd->onWritable = NULL;
577      }
578  }
579  
580  /** Prepare to read bytes from the peer. */
peerProxyExpectBytes(PeerProxy * peerProxy,Header * header)581  static void peerProxyExpectBytes(PeerProxy* peerProxy, Header* header) {
582      ALOGD("Expecting %d bytes.", header->size);
583  
584      peerProxy->inputState = READING_BYTES;
585      if (bufferPrepareForRead(peerProxy->inputBuffer, header->size) == -1) {
586          ALOGW("Couldn't allocate memory for incoming data. Size: %u",
587                  (unsigned int) header->size);
588  
589          // TODO: Ignore the packet and log a warning?
590          peerProxyKill(peerProxy, false);
591      }
592  }
593  
594  /**
595   * Gets a peer proxy for the given ID. Creates a peer proxy if necessary.
596   * Sends a connection request to the master if desired.
597   *
598   * Returns NULL if an error occurs. Sets errno to EHOSTDOWN if the peer died
599   * or ENOMEM if memory couldn't be allocated.
600   */
peerProxyGetOrCreate(Peer * peer,pid_t pid,bool requestConnection)601  static PeerProxy* peerProxyGetOrCreate(Peer* peer, pid_t pid,
602          bool requestConnection) {
603      if (pid == peer->pid) {
604          errno = EINVAL;
605          return NULL;
606      }
607  
608      if (peerIsDead(peer, pid)) {
609          errno = EHOSTDOWN;
610          return NULL;
611      }
612  
613      PeerProxy* peerProxy = hashmapGet(peer->peerProxies, &pid);
614      if (peerProxy != NULL) {
615          return peerProxy;
616      }
617  
618      // If this is the master peer, we already know about all peers.
619      if (peer->master) {
620          errno = EHOSTDOWN;
621          return NULL;
622      }
623  
624      // Try to create a peer proxy.
625      Credentials credentials;
626      credentials.pid = pid;
627  
628      // Fake gid and uid until we have the real thing. The real creds are
629      // filled in by masterProxyExpectConnection(). These fake creds will
630      // never be exposed to the user.
631      credentials.uid = 0;
632      credentials.gid = 0;
633  
634      // Make sure we can allocate the connection request packet.
635      OutgoingPacket* packet = NULL;
636      if (requestConnection) {
637          packet = calloc(1, sizeof(OutgoingPacket));
638          if (packet == NULL) {
639              errno = ENOMEM;
640              return NULL;
641          }
642  
643          packet->header.type = CONNECTION_REQUEST;
644          packet->header.credentials = credentials;
645          packet->free = &outgoingPacketFree;
646      }
647  
648      peerProxy = peerProxyCreate(peer, credentials);
649      if (peerProxy == NULL) {
650          free(packet);
651          errno = ENOMEM;
652          return NULL;
653      } else {
654          // Send a connection request to the master.
655          if (requestConnection) {
656              PeerProxy* masterProxy = peer->masterProxy;
657              peerProxyEnqueueOutgoingPacket(masterProxy, packet);
658          }
659  
660          return peerProxy;
661      }
662  }
663  
664  /**
665   * Switches the master peer proxy into a state where it's waiting for a
666   * connection from the master.
667   */
masterProxyExpectConnection(PeerProxy * masterProxy,Header * header)668  static void masterProxyExpectConnection(PeerProxy* masterProxy,
669          Header* header) {
670      // TODO: Restructure things so we don't need this check.
671      // Verify that this really is the master.
672      if (!masterProxy->master) {
673          ALOGW("Non-master process %d tried to send us a connection.",
674              masterProxy->credentials.pid);
675          // Kill off the evil peer.
676          peerProxyKill(masterProxy, false);
677          return;
678      }
679  
680      masterProxy->inputState = ACCEPTING_CONNECTION;
681      Peer* localPeer = masterProxy->peer;
682  
683      // Create a peer proxy so we have somewhere to stash the creds.
684      // See if we already have a proxy set up.
685      pid_t pid = header->credentials.pid;
686      peerLock(localPeer);
687      PeerProxy* peerProxy = peerProxyGetOrCreate(localPeer, pid, false);
688      if (peerProxy == NULL) {
689          ALOGW("Peer proxy creation failed: %s", strerror(errno));
690      } else {
691          // Fill in full credentials.
692          peerProxy->credentials = header->credentials;
693      }
694      peerUnlock(localPeer);
695  
696      // Keep track of which peer proxy we're accepting a connection for.
697      masterProxy->connecting = peerProxy;
698  }
699  
700  /**
701   * Reads input from a peer process.
702   */
703  static void peerProxyRead(SelectableFd* fd);
704  
705  /** Sets up fd callbacks. */
peerProxySetFd(PeerProxy * peerProxy,SelectableFd * fd)706  static void peerProxySetFd(PeerProxy* peerProxy, SelectableFd* fd) {
707      peerProxy->fd = fd;
708      fd->data = peerProxy;
709      fd->onReadable = &peerProxyRead;
710      fd->beforeSelect = &peerProxyBeforeSelect;
711  
712      // Make the socket non-blocking.
713      setNonBlocking(fd->fd);
714  }
715  
716  /**
717   * Accepts a connection sent by the master proxy.
718   */
masterProxyAcceptConnection(PeerProxy * masterProxy)719  static void masterProxyAcceptConnection(PeerProxy* masterProxy) {
720      struct msghdr msg;
721      struct iovec iov[1];
722      ssize_t size;
723      char ignored;
724      int incomingFd;
725  
726      // TODO: Reuse code which writes the connection. Who the heck designed
727      // this API anyway?
728      union {
729          struct cmsghdr cm;
730          char control[CMSG_SPACE(sizeof(int))];
731      } control_un;
732      struct cmsghdr *cmptr;
733      msg.msg_control = control_un.control;
734      msg.msg_controllen = sizeof(control_un.control);
735  
736      msg.msg_name = NULL;
737      msg.msg_namelen = 0;
738  
739      // We sent 1 byte of data so we can detect EOF.
740      iov[0].iov_base = &ignored;
741      iov[0].iov_len = 1;
742      msg.msg_iov = iov;
743      msg.msg_iovlen = 1;
744  
745      size = recvmsg(masterProxy->fd->fd, &msg, 0);
746      if (size < 0) {
747          if (errno == EINTR) {
748              // Log interruptions but otherwise ignore them.
749              ALOGW("recvmsg() interrupted.");
750              return;
751          } else if (errno == EAGAIN) {
752              // Keep waiting for the connection.
753              return;
754          } else {
755              LOG_ALWAYS_FATAL("Error reading connection from master: %s",
756                      strerror(errno));
757          }
758      } else if (size == 0) {
759          // EOF.
760          LOG_ALWAYS_FATAL("Received EOF from master.");
761      }
762  
763      // Extract fd from message.
764      if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL
765              && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
766          if (cmptr->cmsg_level != SOL_SOCKET) {
767              LOG_ALWAYS_FATAL("Expected SOL_SOCKET.");
768          }
769          if (cmptr->cmsg_type != SCM_RIGHTS) {
770              LOG_ALWAYS_FATAL("Expected SCM_RIGHTS.");
771          }
772          incomingFd = *((int*) CMSG_DATA(cmptr));
773      } else {
774          LOG_ALWAYS_FATAL("Expected fd.");
775      }
776  
777      // The peer proxy this connection is for.
778      PeerProxy* peerProxy = masterProxy->connecting;
779      if (peerProxy == NULL) {
780          ALOGW("Received connection for unknown peer.");
781          closeWithWarning(incomingFd);
782      } else {
783          Peer* peer = masterProxy->peer;
784  
785          SelectableFd* selectableFd = selectorAdd(peer->selector, incomingFd);
786          if (selectableFd == NULL) {
787              ALOGW("Error adding fd to selector for %d.",
788                      peerProxy->credentials.pid);
789              closeWithWarning(incomingFd);
790              peerProxyKill(peerProxy, false);
791          }
792  
793          peerProxySetFd(peerProxy, selectableFd);
794      }
795  
796      peerProxyExpectHeader(masterProxy);
797  }
798  
799  /**
800   * Frees an outgoing packet containing a connection.
801   */
outgoingPacketFreeSocket(OutgoingPacket * packet)802  static void outgoingPacketFreeSocket(OutgoingPacket* packet) {
803      closeWithWarning(packet->socket);
804      outgoingPacketFree(packet);
805  }
806  
807  /**
808   * Connects two known peers.
809   */
masterConnectPeers(PeerProxy * peerA,PeerProxy * peerB)810  static void masterConnectPeers(PeerProxy* peerA, PeerProxy* peerB) {
811      int sockets[2];
812      int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets);
813      if (result == -1) {
814          ALOGW("socketpair() error: %s", strerror(errno));
815          // TODO: Send CONNECTION_FAILED packets to peers.
816          return;
817      }
818  
819      OutgoingPacket* packetA = calloc(1, sizeof(OutgoingPacket));
820      OutgoingPacket* packetB = calloc(1, sizeof(OutgoingPacket));
821      if (packetA == NULL || packetB == NULL) {
822          free(packetA);
823          free(packetB);
824          ALOGW("malloc() error. Failed to tell process %d that process %d is"
825                  " dead.", peerA->credentials.pid, peerB->credentials.pid);
826          return;
827      }
828  
829      packetA->header.type = CONNECTION;
830      packetB->header.type = CONNECTION;
831  
832      packetA->header.credentials = peerB->credentials;
833      packetB->header.credentials = peerA->credentials;
834  
835      packetA->socket = sockets[0];
836      packetB->socket = sockets[1];
837  
838      packetA->free = &outgoingPacketFreeSocket;
839      packetB->free = &outgoingPacketFreeSocket;
840  
841      peerLock(peerA->peer);
842      peerProxyEnqueueOutgoingPacket(peerA, packetA);
843      peerProxyEnqueueOutgoingPacket(peerB, packetB);
844      peerUnlock(peerA->peer);
845  }
846  
847  /**
848   * Informs a peer that the peer they're trying to connect to couldn't be
849   * found.
850   */
masterReportConnectionError(PeerProxy * peerProxy,Credentials credentials)851  static void masterReportConnectionError(PeerProxy* peerProxy,
852          Credentials credentials) {
853      OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
854      if (packet == NULL) {
855          ALOGW("malloc() error. Failed to tell process %d that process %d is"
856                  " dead.", peerProxy->credentials.pid, credentials.pid);
857          return;
858      }
859  
860      packet->header.type = CONNECTION_ERROR;
861      packet->header.credentials = credentials;
862      packet->free = &outgoingPacketFree;
863  
864      peerProxyLockAndEnqueueOutgoingPacket(peerProxy, packet);
865  }
866  
867  /**
868   * Handles a request to be connected to another peer.
869   */
masterHandleConnectionRequest(PeerProxy * peerProxy,Header * header)870  static void masterHandleConnectionRequest(PeerProxy* peerProxy,
871          Header* header) {
872      Peer* master = peerProxy->peer;
873      pid_t targetPid = header->credentials.pid;
874      if (!hashmapContainsKey(peerProxy->connections, &targetPid)) {
875          // We haven't connected these peers yet.
876          PeerProxy* targetPeer
877              = (PeerProxy*) hashmapGet(master->peerProxies, &targetPid);
878          if (targetPeer == NULL) {
879              // Unknown process.
880              masterReportConnectionError(peerProxy, header->credentials);
881          } else {
882              masterConnectPeers(peerProxy, targetPeer);
883          }
884      }
885  
886      // This packet is complete. Get ready for the next one.
887      peerProxyExpectHeader(peerProxy);
888  }
889  
890  /**
891   * The master told us this peer is dead.
892   */
masterProxyHandleConnectionError(PeerProxy * masterProxy,Header * header)893  static void masterProxyHandleConnectionError(PeerProxy* masterProxy,
894          Header* header) {
895      Peer* peer = masterProxy->peer;
896  
897      // Look up the peer proxy.
898      pid_t pid = header->credentials.pid;
899      PeerProxy* peerProxy = NULL;
900      peerLock(peer);
901      peerProxy = hashmapGet(peer->peerProxies, &pid);
902      peerUnlock(peer);
903  
904      if (peerProxy != NULL) {
905          ALOGI("Couldn't connect to %d.", pid);
906          peerProxyKill(peerProxy, false);
907      } else {
908          ALOGW("Peer proxy for %d not found. This shouldn't happen.", pid);
909      }
910  
911      peerProxyExpectHeader(masterProxy);
912  }
913  
914  /**
915   * Handles a packet header.
916   */
peerProxyHandleHeader(PeerProxy * peerProxy,Header * header)917  static void peerProxyHandleHeader(PeerProxy* peerProxy, Header* header) {
918      switch (header->type) {
919          case CONNECTION_REQUEST:
920              masterHandleConnectionRequest(peerProxy, header);
921              break;
922          case CONNECTION:
923              masterProxyExpectConnection(peerProxy, header);
924              break;
925          case CONNECTION_ERROR:
926              masterProxyHandleConnectionError(peerProxy, header);
927              break;
928          case BYTES:
929              peerProxyExpectBytes(peerProxy, header);
930              break;
931          default:
932              ALOGW("Invalid packet type from %d: %d", peerProxy->credentials.pid,
933                      header->type);
934              peerProxyKill(peerProxy, false);
935      }
936  }
937  
938  /**
939   * Buffers input sent by peer. May be called multiple times until the entire
940   * buffer is filled. Returns true when the buffer is full.
941   */
peerProxyBufferInput(PeerProxy * peerProxy)942  static bool peerProxyBufferInput(PeerProxy* peerProxy) {
943      Buffer* in = peerProxy->inputBuffer;
944      ssize_t size = bufferRead(in, peerProxy->fd->fd);
945      if (size < 0) {
946          peerProxyHandleError(peerProxy, "read");
947          return false;
948      } else if (size == 0) {
949          // EOF.
950      	ALOGI("EOF");
951          peerProxyKill(peerProxy, false);
952          return false;
953      } else if (bufferReadComplete(in)) {
954          // We're done!
955          return true;
956      } else {
957          // Continue reading.
958          return false;
959      }
960  }
961  
962  /**
963   * Reads input from a peer process.
964   */
peerProxyRead(SelectableFd * fd)965  static void peerProxyRead(SelectableFd* fd) {
966      ALOGD("Reading...");
967      PeerProxy* peerProxy = (PeerProxy*) fd->data;
968      int state = peerProxy->inputState;
969      Buffer* in = peerProxy->inputBuffer;
970      switch (state) {
971          case READING_HEADER:
972              if (peerProxyBufferInput(peerProxy)) {
973                  ALOGD("Header read.");
974                  // We've read the complete header.
975                  Header* header = (Header*) in->data;
976                  peerProxyHandleHeader(peerProxy, header);
977              }
978              break;
979          case READING_BYTES:
980              ALOGD("Reading bytes...");
981              if (peerProxyBufferInput(peerProxy)) {
982                  ALOGD("Bytes read.");
983                  // We have the complete packet. Notify bytes listener.
984                  peerProxy->peer->onBytes(peerProxy->credentials,
985                      in->data, in->size);
986  
987                  // Get ready for the next packet.
988                  peerProxyExpectHeader(peerProxy);
989              }
990              break;
991          case ACCEPTING_CONNECTION:
992              masterProxyAcceptConnection(peerProxy);
993              break;
994          default:
995              LOG_ALWAYS_FATAL("Unknown state: %d", state);
996      }
997  }
998  
peerProxyCreate(Peer * peer,Credentials credentials)999  static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials) {
1000      PeerProxy* peerProxy = calloc(1, sizeof(PeerProxy));
1001      if (peerProxy == NULL) {
1002          return NULL;
1003      }
1004  
1005      peerProxy->inputBuffer = bufferCreate(sizeof(Header));
1006      if (peerProxy->inputBuffer == NULL) {
1007          free(peerProxy);
1008          return NULL;
1009      }
1010  
1011      peerProxy->peer = peer;
1012      peerProxy->credentials = credentials;
1013  
1014      // Initial state == expecting a header.
1015      peerProxyExpectHeader(peerProxy);
1016  
1017      // Add this proxy to the map. Make sure the key points to the stable memory
1018      // inside of the peer proxy itself.
1019      pid_t* pid = &(peerProxy->credentials.pid);
1020      hashmapPut(peer->peerProxies, pid, peerProxy);
1021      return peerProxy;
1022  }
1023  
1024  /** Accepts a connection to the master peer. */
masterAcceptConnection(SelectableFd * listenerFd)1025  static void masterAcceptConnection(SelectableFd* listenerFd) {
1026      // Accept connection.
1027      int socket = accept(listenerFd->fd, NULL, NULL);
1028      if (socket == -1) {
1029          ALOGW("accept() error: %s", strerror(errno));
1030          return;
1031      }
1032  
1033      ALOGD("Accepted connection as fd %d.", socket);
1034  
1035      // Get credentials.
1036      Credentials credentials;
1037      struct ucred ucredentials;
1038      socklen_t credentialsSize = sizeof(struct ucred);
1039      int result = getsockopt(socket, SOL_SOCKET, SO_PEERCRED,
1040                  &ucredentials, &credentialsSize);
1041      // We might want to verify credentialsSize.
1042      if (result == -1) {
1043          ALOGW("getsockopt() error: %s", strerror(errno));
1044          closeWithWarning(socket);
1045          return;
1046      }
1047  
1048      // Copy values into our own structure so we know we have the types right.
1049      credentials.pid = ucredentials.pid;
1050      credentials.uid = ucredentials.uid;
1051      credentials.gid = ucredentials.gid;
1052  
1053      ALOGI("Accepted connection from process %d.", credentials.pid);
1054  
1055      Peer* masterPeer = (Peer*) listenerFd->data;
1056  
1057      peerLock(masterPeer);
1058  
1059      // Make sure we don't already have a connection from that process.
1060      PeerProxy* peerProxy
1061          = hashmapGet(masterPeer->peerProxies, &credentials.pid);
1062      if (peerProxy != NULL) {
1063          peerUnlock(masterPeer);
1064          ALOGW("Alread connected to process %d.", credentials.pid);
1065          closeWithWarning(socket);
1066          return;
1067      }
1068  
1069      // Add connection to the selector.
1070      SelectableFd* socketFd = selectorAdd(masterPeer->selector, socket);
1071      if (socketFd == NULL) {
1072          peerUnlock(masterPeer);
1073          ALOGW("malloc() failed.");
1074          closeWithWarning(socket);
1075          return;
1076      }
1077  
1078      // Create a peer proxy.
1079      peerProxy = peerProxyCreate(masterPeer, credentials);
1080      peerUnlock(masterPeer);
1081      if (peerProxy == NULL) {
1082          ALOGW("malloc() failed.");
1083          socketFd->remove = true;
1084          closeWithWarning(socket);
1085      }
1086      peerProxy->connections = hashmapCreate(10, &pidHash, &pidEquals);
1087      peerProxySetFd(peerProxy, socketFd);
1088  }
1089  
1090  /**
1091   * Creates the local peer.
1092   */
peerCreate()1093  static Peer* peerCreate() {
1094      Peer* peer = calloc(1, sizeof(Peer));
1095      if (peer == NULL) {
1096          LOG_ALWAYS_FATAL("malloc() error.");
1097      }
1098      peer->peerProxies = hashmapCreate(10, &pidHash, &pidEquals);
1099      peer->selector = selectorCreate();
1100  
1101      pthread_mutexattr_t attributes;
1102      if (pthread_mutexattr_init(&attributes) != 0) {
1103          LOG_ALWAYS_FATAL("pthread_mutexattr_init() error.");
1104      }
1105      if (pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_RECURSIVE) != 0) {
1106          LOG_ALWAYS_FATAL("pthread_mutexattr_settype() error.");
1107      }
1108      if (pthread_mutex_init(&peer->mutex, &attributes) != 0) {
1109          LOG_ALWAYS_FATAL("pthread_mutex_init() error.");
1110      }
1111  
1112      peer->pid = getpid();
1113      return peer;
1114  }
1115  
1116  /** The local peer. */
1117  static Peer* localPeer;
1118  
1119  /** Frees a packet of bytes. */
outgoingPacketFreeBytes(OutgoingPacket * packet)1120  static void outgoingPacketFreeBytes(OutgoingPacket* packet) {
1121      ALOGD("Freeing outgoing packet.");
1122      bufferFree(packet->bytes);
1123      free(packet);
1124  }
1125  
1126  /**
1127   * Sends a packet of bytes to a remote peer. Returns 0 on success.
1128   *
1129   * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
1130   * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
1131   * to EINVAL if pid is the same as the local pid.
1132   */
peerSendBytes(pid_t pid,const char * bytes,size_t size)1133  int peerSendBytes(pid_t pid, const char* bytes, size_t size) {
1134  	Peer* peer = localPeer;
1135      assert(peer != NULL);
1136  
1137      OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
1138      if (packet == NULL) {
1139          errno = ENOMEM;
1140          return -1;
1141      }
1142  
1143      Buffer* copy = bufferCreate(size);
1144      if (copy == NULL) {
1145          free(packet);
1146          errno = ENOMEM;
1147          return -1;
1148      }
1149  
1150      // Copy data.
1151      memcpy(copy->data, bytes, size);
1152      copy->size = size;
1153  
1154      packet->bytes = copy;
1155      packet->header.type = BYTES;
1156      packet->header.size = size;
1157      packet->free = outgoingPacketFreeBytes;
1158      bufferPrepareForWrite(packet->bytes);
1159  
1160      peerLock(peer);
1161  
1162      PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
1163      if (peerProxy == NULL) {
1164          // The peer is already dead or we couldn't alloc memory. Either way,
1165          // errno is set.
1166          peerUnlock(peer);
1167          packet->free(packet);
1168          return -1;
1169      } else {
1170          peerProxyEnqueueOutgoingPacket(peerProxy, packet);
1171          peerUnlock(peer);
1172          selectorWakeUp(peer->selector);
1173          return 0;
1174      }
1175  }
1176  
1177  /** Keeps track of how to free shared bytes. */
1178  typedef struct {
1179      void (*free)(void* context);
1180      void* context;
1181  } SharedBytesFreer;
1182  
1183  /** Frees shared bytes. */
outgoingPacketFreeSharedBytes(OutgoingPacket * packet)1184  static void outgoingPacketFreeSharedBytes(OutgoingPacket* packet) {
1185      SharedBytesFreer* sharedBytesFreer
1186          = (SharedBytesFreer*) packet->context;
1187      sharedBytesFreer->free(sharedBytesFreer->context);
1188      free(sharedBytesFreer);
1189      free(packet);
1190  }
1191  
1192  /**
1193   * Sends a packet of bytes to a remote peer without copying the bytes. Calls
1194   * free() with context after the bytes have been sent.
1195   *
1196   * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
1197   * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
1198   * to EINVAL if pid is the same as the local pid.
1199   */
peerSendSharedBytes(pid_t pid,char * bytes,size_t size,void (* free)(void * context),void * context)1200  int peerSendSharedBytes(pid_t pid, char* bytes, size_t size,
1201          void (*free)(void* context), void* context) {
1202      Peer* peer = localPeer;
1203      assert(peer != NULL);
1204  
1205      OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
1206      if (packet == NULL) {
1207          errno = ENOMEM;
1208          return -1;
1209      }
1210  
1211      Buffer* wrapper = bufferWrap(bytes, size, size);
1212      if (wrapper == NULL) {
1213          free(packet);
1214          errno = ENOMEM;
1215          return -1;
1216      }
1217  
1218      SharedBytesFreer* sharedBytesFreer = malloc(sizeof(SharedBytesFreer));
1219      if (sharedBytesFreer == NULL) {
1220          free(packet);
1221          free(wrapper);
1222          errno = ENOMEM;
1223          return -1;
1224      }
1225      sharedBytesFreer->free = free;
1226      sharedBytesFreer->context = context;
1227  
1228      packet->bytes = wrapper;
1229      packet->context = sharedBytesFreer;
1230      packet->header.type = BYTES;
1231      packet->header.size = size;
1232      packet->free = &outgoingPacketFreeSharedBytes;
1233      bufferPrepareForWrite(packet->bytes);
1234  
1235      peerLock(peer);
1236  
1237      PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
1238      if (peerProxy == NULL) {
1239          // The peer is already dead or we couldn't alloc memory. Either way,
1240          // errno is set.
1241          peerUnlock(peer);
1242          packet->free(packet);
1243          return -1;
1244      } else {
1245          peerProxyEnqueueOutgoingPacket(peerProxy, packet);
1246          peerUnlock(peer);
1247          selectorWakeUp(peer->selector);
1248          return 0;
1249      }
1250  }
1251  
1252  /**
1253   * Starts the master peer. The master peer differs from other peers in that
1254   * it is responsible for connecting the other peers. You can only have one
1255   * master peer.
1256   *
1257   * Goes into an I/O loop and does not return.
1258   */
masterPeerInitialize(BytesListener * bytesListener,DeathListener * deathListener)1259  void masterPeerInitialize(BytesListener* bytesListener,
1260          DeathListener* deathListener) {
1261      // Create and bind socket.
1262      int listenerSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
1263      if (listenerSocket == -1) {
1264          LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
1265      }
1266      unlink(MASTER_PATH);
1267      int result = bind(listenerSocket, (SocketAddress*) getMasterAddress(),
1268              sizeof(UnixAddress));
1269      if (result == -1) {
1270          LOG_ALWAYS_FATAL("bind() error: %s", strerror(errno));
1271      }
1272  
1273      ALOGD("Listener socket: %d",  listenerSocket);
1274  
1275      // Queue up to 16 connections.
1276      result = listen(listenerSocket, 16);
1277      if (result != 0) {
1278          LOG_ALWAYS_FATAL("listen() error: %s", strerror(errno));
1279      }
1280  
1281      // Make socket non-blocking.
1282      setNonBlocking(listenerSocket);
1283  
1284      // Create the peer for this process. Fail if we already have one.
1285      if (localPeer != NULL) {
1286          LOG_ALWAYS_FATAL("Peer is already initialized.");
1287      }
1288      localPeer = peerCreate();
1289      if (localPeer == NULL) {
1290          LOG_ALWAYS_FATAL("malloc() failed.");
1291      }
1292      localPeer->master = true;
1293      localPeer->onBytes = bytesListener;
1294      localPeer->onDeath = deathListener;
1295  
1296      // Make listener socket selectable.
1297      SelectableFd* listenerFd = selectorAdd(localPeer->selector, listenerSocket);
1298      if (listenerFd == NULL) {
1299          LOG_ALWAYS_FATAL("malloc() error.");
1300      }
1301      listenerFd->data = localPeer;
1302      listenerFd->onReadable = &masterAcceptConnection;
1303  }
1304  
1305  /**
1306   * Starts a local peer.
1307   *
1308   * Goes into an I/O loop and does not return.
1309   */
peerInitialize(BytesListener * bytesListener,DeathListener * deathListener)1310  void peerInitialize(BytesListener* bytesListener,
1311          DeathListener* deathListener) {
1312      // Connect to master peer.
1313      int masterSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
1314      if (masterSocket == -1) {
1315          LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
1316      }
1317      int result = connect(masterSocket, (SocketAddress*) getMasterAddress(),
1318              sizeof(UnixAddress));
1319      if (result != 0) {
1320          LOG_ALWAYS_FATAL("connect() error: %s", strerror(errno));
1321      }
1322  
1323      // Create the peer for this process. Fail if we already have one.
1324      if (localPeer != NULL) {
1325          LOG_ALWAYS_FATAL("Peer is already initialized.");
1326      }
1327      localPeer = peerCreate();
1328      if (localPeer == NULL) {
1329          LOG_ALWAYS_FATAL("malloc() failed.");
1330      }
1331      localPeer->onBytes = bytesListener;
1332      localPeer->onDeath = deathListener;
1333  
1334      // Make connection selectable.
1335      SelectableFd* masterFd = selectorAdd(localPeer->selector, masterSocket);
1336      if (masterFd == NULL) {
1337          LOG_ALWAYS_FATAL("malloc() error.");
1338      }
1339  
1340      // Create a peer proxy for the master peer.
1341      PeerProxy* masterProxy = peerProxyCreate(localPeer, MASTER_CREDENTIALS);
1342      if (masterProxy == NULL) {
1343          LOG_ALWAYS_FATAL("malloc() error.");
1344      }
1345      peerProxySetFd(masterProxy, masterFd);
1346      masterProxy->master = true;
1347      localPeer->masterProxy = masterProxy;
1348  }
1349  
1350  /** Starts the master peer I/O loop. Doesn't return. */
peerLoop()1351  void peerLoop() {
1352      assert(localPeer != NULL);
1353  
1354      // Start selector.
1355      selectorLoop(localPeer->selector);
1356  }
1357  
1358