• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2004--2005, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #if defined(_MSC_VER) && _MSC_VER < 1300
29 #pragma warning(disable:4786)
30 #endif
31 
32 #include <cassert>
33 
34 #ifdef POSIX
35 #include <string.h>
36 #include <errno.h>
37 #include <fcntl.h>
38 #include <sys/time.h>
39 #include <unistd.h>
40 #include <signal.h>
41 #endif
42 
43 #ifdef WIN32
44 #define WIN32_LEAN_AND_MEAN
45 #include <windows.h>
46 #include <winsock2.h>
47 #include <ws2tcpip.h>
48 #undef SetPort
49 #endif
50 
51 #include <algorithm>
52 #include <map>
53 
54 #include "talk/base/basictypes.h"
55 #include "talk/base/byteorder.h"
56 #include "talk/base/common.h"
57 #include "talk/base/logging.h"
58 #include "talk/base/nethelpers.h"
59 #include "talk/base/physicalsocketserver.h"
60 #include "talk/base/time.h"
61 #include "talk/base/winping.h"
62 #include "talk/base/win32socketinit.h"
63 
64 // stm: this will tell us if we are on OSX
65 #ifdef HAVE_CONFIG_H
66 #include "config.h"
67 #endif
68 
69 #ifdef POSIX
70 #include <netinet/tcp.h>  // for TCP_NODELAY
71 #define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h
72 typedef void* SockOptArg;
73 #endif  // POSIX
74 
75 #ifdef WIN32
76 typedef char* SockOptArg;
77 #endif
78 
79 namespace talk_base {
80 
81 // Standard MTUs, from RFC 1191
82 const uint16 PACKET_MAXIMUMS[] = {
83   65535,    // Theoretical maximum, Hyperchannel
84   32000,    // Nothing
85   17914,    // 16Mb IBM Token Ring
86   8166,     // IEEE 802.4
87   //4464,   // IEEE 802.5 (4Mb max)
88   4352,     // FDDI
89   //2048,   // Wideband Network
90   2002,     // IEEE 802.5 (4Mb recommended)
91   //1536,   // Expermental Ethernet Networks
92   //1500,   // Ethernet, Point-to-Point (default)
93   1492,     // IEEE 802.3
94   1006,     // SLIP, ARPANET
95   //576,    // X.25 Networks
96   //544,    // DEC IP Portal
97   //512,    // NETBIOS
98   508,      // IEEE 802/Source-Rt Bridge, ARCNET
99   296,      // Point-to-Point (low delay)
100   68,       // Official minimum
101   0,        // End of list marker
102 };
103 
104 const uint32 IP_HEADER_SIZE = 20;
105 const uint32 ICMP_HEADER_SIZE = 8;
106 
107 class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
108  public:
PhysicalSocket(PhysicalSocketServer * ss,SOCKET s=INVALID_SOCKET)109   PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET)
110     : ss_(ss), s_(s), enabled_events_(0), error_(0),
111       state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
112       resolver_(NULL) {
113 #ifdef WIN32
114     // EnsureWinsockInit() ensures that winsock is initialized. The default
115     // version of this function doesn't do anything because winsock is
116     // initialized by constructor of a static object. If neccessary libjingle
117     // users can link it with a different version of this function by replacing
118     // win32socketinit.cc. See win32socketinit.cc for more details.
119     EnsureWinsockInit();
120 #endif
121     if (s_ != INVALID_SOCKET) {
122       enabled_events_ = DE_READ | DE_WRITE;
123 
124       int type = SOCK_STREAM;
125       socklen_t len = sizeof(type);
126       VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len));
127       udp_ = (SOCK_DGRAM == type);
128     }
129   }
130 
~PhysicalSocket()131   virtual ~PhysicalSocket() {
132     Close();
133   }
134 
135   // Creates the underlying OS socket (same as the "socket" function).
Create(int type)136   virtual bool Create(int type) {
137     Close();
138     s_ = ::socket(AF_INET, type, 0);
139     udp_ = (SOCK_DGRAM == type);
140     UpdateLastError();
141     if (udp_)
142       enabled_events_ = DE_READ | DE_WRITE;
143     return s_ != INVALID_SOCKET;
144   }
145 
GetLocalAddress() const146   SocketAddress GetLocalAddress() const {
147     sockaddr_in addr;
148     socklen_t addrlen = sizeof(addr);
149     int result = ::getsockname(s_, (sockaddr*)&addr, &addrlen);
150     SocketAddress address;
151     if (result >= 0) {
152       ASSERT(addrlen == sizeof(addr));
153       address.FromSockAddr(addr);
154     } else {
155       LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
156                       << s_;
157     }
158     return address;
159   }
160 
GetRemoteAddress() const161   SocketAddress GetRemoteAddress() const {
162     sockaddr_in addr;
163     socklen_t addrlen = sizeof(addr);
164     int result = ::getpeername(s_, (sockaddr*)&addr, &addrlen);
165     SocketAddress address;
166     if (result >= 0) {
167       ASSERT(addrlen == sizeof(addr));
168       address.FromSockAddr(addr);
169     } else {
170       LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket="
171                       << s_;
172     }
173     return address;
174   }
175 
Bind(const SocketAddress & addr)176   int Bind(const SocketAddress& addr) {
177     sockaddr_in saddr;
178     addr.ToSockAddr(&saddr);
179     int err = ::bind(s_, (sockaddr*)&saddr, sizeof(saddr));
180     UpdateLastError();
181 #ifdef _DEBUG
182     if (0 == err) {
183       dbg_addr_ = "Bound @ ";
184       dbg_addr_.append(GetLocalAddress().ToString());
185     }
186 #endif  // _DEBUG
187     return err;
188   }
189 
Connect(const SocketAddress & addr)190   int Connect(const SocketAddress& addr) {
191     // TODO: Implicit creation is required to reconnect...
192     // ...but should we make it more explicit?
193     if ((s_ == INVALID_SOCKET) && !Create(SOCK_STREAM))
194       return SOCKET_ERROR;
195     if (addr.IsUnresolved()) {
196       if (state_ != CS_CLOSED) {
197         SetError(EALREADY);
198         return SOCKET_ERROR;
199       }
200 
201       LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
202       resolver_ = new AsyncResolver();
203       resolver_->set_address(addr);
204       resolver_->SignalWorkDone.connect(this, &PhysicalSocket::OnResolveResult);
205       resolver_->Start();
206       state_ = CS_CONNECTING;
207       return 0;
208     }
209 
210     return DoConnect(addr);
211   }
212 
DoConnect(const SocketAddress & addr)213   int DoConnect(const SocketAddress& addr) {
214     sockaddr_in saddr;
215     addr.ToSockAddr(&saddr);
216     int err = ::connect(s_, (sockaddr*)&saddr, sizeof(saddr));
217     UpdateLastError();
218     if (err == 0) {
219       state_ = CS_CONNECTED;
220     } else if (IsBlockingError(error_)) {
221       state_ = CS_CONNECTING;
222       enabled_events_ |= DE_CONNECT;
223     } else {
224       return SOCKET_ERROR;
225     }
226 
227     enabled_events_ |= DE_READ | DE_WRITE;
228     return 0;
229   }
230 
GetError() const231   int GetError() const {
232     return error_;
233   }
234 
SetError(int error)235   void SetError(int error) {
236     error_ = error;
237   }
238 
GetState() const239   ConnState GetState() const {
240     return state_;
241   }
242 
GetOption(Option opt,int * value)243   int GetOption(Option opt, int* value) {
244     int slevel;
245     int sopt;
246     if (TranslateOption(opt, &slevel, &sopt) == -1)
247       return -1;
248     socklen_t optlen = sizeof(*value);
249     int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
250     if (ret != -1 && opt == OPT_DONTFRAGMENT) {
251 #ifdef LINUX
252       *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
253 #endif
254     }
255     return ret;
256   }
257 
SetOption(Option opt,int value)258   int SetOption(Option opt, int value) {
259     int slevel;
260     int sopt;
261     if (TranslateOption(opt, &slevel, &sopt) == -1)
262       return -1;
263     if (opt == OPT_DONTFRAGMENT) {
264 #ifdef LINUX
265       value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
266 #endif
267     }
268     return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
269   }
270 
Send(const void * pv,size_t cb)271   int Send(const void *pv, size_t cb) {
272     int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb,
273 #ifdef LINUX
274         // Suppress SIGPIPE. Without this, attempting to send on a socket whose
275         // other end is closed will result in a SIGPIPE signal being raised to
276         // our process, which by default will terminate the process, which we
277         // don't want. By specifying this flag, we'll just get the error EPIPE
278         // instead and can handle the error gracefully.
279         MSG_NOSIGNAL
280 #else
281         0
282 #endif
283         );
284     UpdateLastError();
285     // We have seen minidumps where this may be false.
286     ASSERT(sent <= static_cast<int>(cb));
287     if ((sent < 0) && IsBlockingError(error_)) {
288       enabled_events_ |= DE_WRITE;
289     }
290     return sent;
291   }
292 
SendTo(const void * pv,size_t cb,const SocketAddress & addr)293   int SendTo(const void *pv, size_t cb, const SocketAddress& addr) {
294     sockaddr_in saddr;
295     addr.ToSockAddr(&saddr);
296     int sent = ::sendto(
297         s_, (const char *)pv, (int)cb,
298 #ifdef LINUX
299         // Suppress SIGPIPE. See above for explanation.
300         MSG_NOSIGNAL,
301 #else
302         0,
303 #endif
304         (sockaddr*)&saddr, sizeof(saddr));
305     UpdateLastError();
306     // We have seen minidumps where this may be false.
307     ASSERT(sent <= static_cast<int>(cb));
308     if ((sent < 0) && IsBlockingError(error_)) {
309       enabled_events_ |= DE_WRITE;
310     }
311     return sent;
312   }
313 
Recv(void * pv,size_t cb)314   int Recv(void *pv, size_t cb) {
315     int received = ::recv(s_, (char *)pv, (int)cb, 0);
316     if ((received == 0) && (cb != 0)) {
317       // Note: on graceful shutdown, recv can return 0.  In this case, we
318       // pretend it is blocking, and then signal close, so that simplifying
319       // assumptions can be made about Recv.
320       LOG(LS_WARNING) << "EOF from socket; deferring close event";
321       // Must turn this back on so that the select() loop will notice the close
322       // event.
323       enabled_events_ |= DE_READ;
324       error_ = EWOULDBLOCK;
325       return SOCKET_ERROR;
326     }
327     UpdateLastError();
328     bool success = (received >= 0) || IsBlockingError(error_);
329     if (udp_ || success) {
330       enabled_events_ |= DE_READ;
331     }
332     if (!success) {
333       LOG_F(LS_VERBOSE) << "Error = " << error_;
334     }
335     return received;
336   }
337 
RecvFrom(void * pv,size_t cb,SocketAddress * paddr)338   int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) {
339     sockaddr_in saddr;
340     socklen_t cbAddr = sizeof(saddr);
341     int received = ::recvfrom(s_, (char *)pv, (int)cb, 0, (sockaddr*)&saddr,
342                               &cbAddr);
343     UpdateLastError();
344     if ((received >= 0) && (paddr != NULL))
345       paddr->FromSockAddr(saddr);
346     bool success = (received >= 0) || IsBlockingError(error_);
347     if (udp_ || success) {
348       enabled_events_ |= DE_READ;
349     }
350     if (!success) {
351       LOG_F(LS_VERBOSE) << "Error = " << error_;
352     }
353     return received;
354   }
355 
Listen(int backlog)356   int Listen(int backlog) {
357     int err = ::listen(s_, backlog);
358     UpdateLastError();
359     if (err == 0) {
360       state_ = CS_CONNECTING;
361       enabled_events_ |= DE_ACCEPT;
362 #ifdef _DEBUG
363       dbg_addr_ = "Listening @ ";
364       dbg_addr_.append(GetLocalAddress().ToString());
365 #endif  // _DEBUG
366     }
367     return err;
368   }
369 
Accept(SocketAddress * paddr)370   AsyncSocket* Accept(SocketAddress *paddr) {
371     sockaddr_in saddr;
372     socklen_t cbAddr = sizeof(saddr);
373     SOCKET s = ::accept(s_, (sockaddr*)&saddr, &cbAddr);
374     UpdateLastError();
375     if (s == INVALID_SOCKET)
376       return NULL;
377     enabled_events_ |= DE_ACCEPT;
378     if (paddr != NULL)
379       paddr->FromSockAddr(saddr);
380     return ss_->WrapSocket(s);
381   }
382 
Close()383   int Close() {
384     if (s_ == INVALID_SOCKET)
385       return 0;
386     int err = ::closesocket(s_);
387     UpdateLastError();
388     s_ = INVALID_SOCKET;
389     state_ = CS_CLOSED;
390     enabled_events_ = 0;
391     if (resolver_) {
392       resolver_->Destroy(false);
393       resolver_ = NULL;
394     }
395     return err;
396   }
397 
EstimateMTU(uint16 * mtu)398   int EstimateMTU(uint16* mtu) {
399     SocketAddress addr = GetRemoteAddress();
400     if (addr.IsAny()) {
401       error_ = ENOTCONN;
402       return -1;
403     }
404 
405 #if defined(WIN32)
406     // Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
407     WinPing ping;
408     if (!ping.IsValid()) {
409       error_ = EINVAL; // can't think of a better error ID
410       return -1;
411     }
412 
413     for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) {
414       int32 size = PACKET_MAXIMUMS[level] - IP_HEADER_SIZE - ICMP_HEADER_SIZE;
415       WinPing::PingResult result = ping.Ping(addr.ip(), size, 0, 1, false);
416       if (result == WinPing::PING_FAIL) {
417         error_ = EINVAL; // can't think of a better error ID
418         return -1;
419       } else if (result != WinPing::PING_TOO_LARGE) {
420         *mtu = PACKET_MAXIMUMS[level];
421         return 0;
422       }
423     }
424 
425     ASSERT(false);
426     return -1;
427 #elif defined(IOS) || defined(OSX)
428     // No simple way to do this on Mac OS X.
429     // SIOCGIFMTU would work if we knew which interface would be used, but
430     // figuring that out is pretty complicated. For now we'll return an error
431     // and let the caller pick a default MTU.
432     error_ = EINVAL;
433     return -1;
434 #elif defined(LINUX) || defined(ANDROID)
435     // Gets the path MTU.
436     int value;
437     socklen_t vlen = sizeof(value);
438     int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen);
439     if (err < 0) {
440       UpdateLastError();
441       return err;
442     }
443 
444     ASSERT((0 <= value) && (value <= 65536));
445     *mtu = value;
446     return 0;
447 #endif
448   }
449 
socketserver()450   SocketServer* socketserver() { return ss_; }
451 
452  protected:
OnResolveResult(SignalThread * thread)453   void OnResolveResult(SignalThread* thread) {
454     if (thread != resolver_) {
455       return;
456     }
457 
458     int error = resolver_->error();
459     if (error == 0) {
460       error = DoConnect(resolver_->address());
461     } else {
462       Close();
463     }
464 
465     if (error) {
466       error_ = error;
467       SignalCloseEvent(this, error_);
468     }
469   }
470 
UpdateLastError()471   void UpdateLastError() {
472     error_ = LAST_SYSTEM_ERROR;
473   }
474 
TranslateOption(Option opt,int * slevel,int * sopt)475   static int TranslateOption(Option opt, int* slevel, int* sopt) {
476     switch (opt) {
477       case OPT_DONTFRAGMENT:
478 #ifdef WIN32
479         *slevel = IPPROTO_IP;
480         *sopt = IP_DONTFRAGMENT;
481         break;
482 #elif defined(IOS) || defined(OSX) || defined(BSD)
483         LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
484         return -1;
485 #elif defined(POSIX)
486         *slevel = IPPROTO_IP;
487         *sopt = IP_MTU_DISCOVER;
488         break;
489 #endif
490       case OPT_RCVBUF:
491         *slevel = SOL_SOCKET;
492         *sopt = SO_RCVBUF;
493         break;
494       case OPT_SNDBUF:
495         *slevel = SOL_SOCKET;
496         *sopt = SO_SNDBUF;
497         break;
498       case OPT_NODELAY:
499         *slevel = IPPROTO_TCP;
500         *sopt = TCP_NODELAY;
501         break;
502       default:
503         ASSERT(false);
504         return -1;
505     }
506     return 0;
507   }
508 
509   PhysicalSocketServer* ss_;
510   SOCKET s_;
511   uint8 enabled_events_;
512   bool udp_;
513   int error_;
514   ConnState state_;
515   AsyncResolver* resolver_;
516 
517 #ifdef _DEBUG
518   std::string dbg_addr_;
519 #endif  // _DEBUG;
520 };
521 
522 #ifdef POSIX
523 class EventDispatcher : public Dispatcher {
524  public:
EventDispatcher(PhysicalSocketServer * ss)525   EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
526     if (pipe(afd_) < 0)
527       LOG(LERROR) << "pipe failed";
528     ss_->Add(this);
529   }
530 
~EventDispatcher()531   virtual ~EventDispatcher() {
532     ss_->Remove(this);
533     close(afd_[0]);
534     close(afd_[1]);
535   }
536 
Signal()537   virtual void Signal() {
538     CritScope cs(&crit_);
539     if (!fSignaled_) {
540       const uint8 b[1] = { 0 };
541       if (VERIFY(1 == write(afd_[1], b, sizeof(b)))) {
542         fSignaled_ = true;
543       }
544     }
545   }
546 
GetRequestedEvents()547   virtual uint32 GetRequestedEvents() {
548     return DE_READ;
549   }
550 
OnPreEvent(uint32 ff)551   virtual void OnPreEvent(uint32 ff) {
552     // It is not possible to perfectly emulate an auto-resetting event with
553     // pipes.  This simulates it by resetting before the event is handled.
554 
555     CritScope cs(&crit_);
556     if (fSignaled_) {
557       uint8 b[4];  // Allow for reading more than 1 byte, but expect 1.
558       VERIFY(1 == read(afd_[0], b, sizeof(b)));
559       fSignaled_ = false;
560     }
561   }
562 
OnEvent(uint32 ff,int err)563   virtual void OnEvent(uint32 ff, int err) {
564     ASSERT(false);
565   }
566 
GetDescriptor()567   virtual int GetDescriptor() {
568     return afd_[0];
569   }
570 
IsDescriptorClosed()571   virtual bool IsDescriptorClosed() {
572     return false;
573   }
574 
575  private:
576   PhysicalSocketServer *ss_;
577   int afd_[2];
578   bool fSignaled_;
579   CriticalSection crit_;
580 };
581 
582 // These two classes use the self-pipe trick to deliver POSIX signals to our
583 // select loop. This is the only safe, reliable, cross-platform way to do
584 // non-trivial things with a POSIX signal in an event-driven program (until
585 // proper pselect() implementations become ubiquitous).
586 
587 class PosixSignalHandler {
588  public:
589   // POSIX only specifies 32 signals, but in principle the system might have
590   // more and the programmer might choose to use them, so we size our array
591   // for 128.
592   static const int kNumPosixSignals = 128;
593 
Instance()594   static PosixSignalHandler *Instance() { return &instance_; }
595 
596   // Returns true if the given signal number is set.
IsSignalSet(int signum) const597   bool IsSignalSet(int signum) const {
598     ASSERT(signum < ARRAY_SIZE(received_signal_));
599     if (signum < ARRAY_SIZE(received_signal_)) {
600       return received_signal_[signum];
601     } else {
602       return false;
603     }
604   }
605 
606   // Clears the given signal number.
ClearSignal(int signum)607   void ClearSignal(int signum) {
608     ASSERT(signum < ARRAY_SIZE(received_signal_));
609     if (signum < ARRAY_SIZE(received_signal_)) {
610       received_signal_[signum] = false;
611     }
612   }
613 
614   // Returns the file descriptor to monitor for signal events.
GetDescriptor() const615   int GetDescriptor() const {
616     return afd_[0];
617   }
618 
619   // This is called directly from our real signal handler, so it must be
620   // signal-handler-safe. That means it cannot assume anything about the
621   // user-level state of the process, since the handler could be executed at any
622   // time on any thread.
OnPosixSignalReceived(int signum)623   void OnPosixSignalReceived(int signum) {
624     if (signum >= ARRAY_SIZE(received_signal_)) {
625       // We don't have space in our array for this.
626       return;
627     }
628     // Set a flag saying we've seen this signal.
629     received_signal_[signum] = true;
630     // Notify application code that we got a signal.
631     const uint8 b[1] = { 0 };
632     if (-1 == write(afd_[1], b, sizeof(b))) {
633       // Nothing we can do here. If there's an error somehow then there's
634       // nothing we can safely do from a signal handler.
635       // No, we can't even safely log it.
636       // But, we still have to check the return value here. Otherwise,
637       // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help.
638       return;
639     }
640   }
641 
642  private:
PosixSignalHandler()643   PosixSignalHandler() {
644     if (pipe(afd_) < 0) {
645       LOG_ERR(LS_ERROR) << "pipe failed";
646       return;
647     }
648     if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) {
649       LOG_ERR(LS_WARNING) << "fcntl #1 failed";
650     }
651     if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) {
652       LOG_ERR(LS_WARNING) << "fcntl #2 failed";
653     }
654     memset(const_cast<void *>(static_cast<volatile void *>(received_signal_)),
655            0,
656            sizeof(received_signal_));
657   }
658 
~PosixSignalHandler()659   ~PosixSignalHandler() {
660     int fd1 = afd_[0];
661     int fd2 = afd_[1];
662     // We clobber the stored file descriptor numbers here or else in principle
663     // a signal that happens to be delivered during application termination
664     // could erroneously write a zero byte to an unrelated file handle in
665     // OnPosixSignalReceived() if some other file happens to be opened later
666     // during shutdown and happens to be given the same file descriptor number
667     // as our pipe had. Unfortunately even with this precaution there is still a
668     // race where that could occur if said signal happens to be handled
669     // concurrently with this code and happens to have already read the value of
670     // afd_[1] from memory before we clobber it, but that's unlikely.
671     afd_[0] = -1;
672     afd_[1] = -1;
673     close(fd1);
674     close(fd2);
675   }
676 
677   // There is just a single global instance. (Signal handlers do not get any
678   // sort of user-defined void * parameter, so they can't access anything that
679   // isn't global.)
680   static PosixSignalHandler instance_;
681 
682   int afd_[2];
683   // These are boolean flags that will be set in our signal handler and read
684   // and cleared from Wait(). There is a race involved in this, but it is
685   // benign. The signal handler sets the flag before signaling the pipe, so
686   // we'll never end up blocking in select() while a flag is still true.
687   // However, if two of the same signal arrive close to each other then it's
688   // possible that the second time the handler may set the flag while it's still
689   // true, meaning that signal will be missed. But the first occurrence of it
690   // will still be handled, so this isn't a problem.
691   // Volatile is not necessary here for correctness, but this data _is_ volatile
692   // so I've marked it as such.
693   volatile uint8 received_signal_[kNumPosixSignals];
694 };
695 
696 PosixSignalHandler PosixSignalHandler::instance_;
697 
698 class PosixSignalDispatcher : public Dispatcher {
699  public:
PosixSignalDispatcher(PhysicalSocketServer * owner)700   PosixSignalDispatcher(PhysicalSocketServer *owner) : owner_(owner) {
701     owner_->Add(this);
702   }
703 
~PosixSignalDispatcher()704   virtual ~PosixSignalDispatcher() {
705     owner_->Remove(this);
706   }
707 
GetRequestedEvents()708   virtual uint32 GetRequestedEvents() {
709     return DE_READ;
710   }
711 
OnPreEvent(uint32 ff)712   virtual void OnPreEvent(uint32 ff) {
713     // Events might get grouped if signals come very fast, so we read out up to
714     // 16 bytes to make sure we keep the pipe empty.
715     uint8 b[16];
716     ssize_t ret = read(GetDescriptor(), b, sizeof(b));
717     if (ret < 0) {
718       LOG_ERR(LS_WARNING) << "Error in read()";
719     } else if (ret == 0) {
720       LOG(LS_WARNING) << "Should have read at least one byte";
721     }
722   }
723 
OnEvent(uint32 ff,int err)724   virtual void OnEvent(uint32 ff, int err) {
725     for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals;
726          ++signum) {
727       if (PosixSignalHandler::Instance()->IsSignalSet(signum)) {
728         PosixSignalHandler::Instance()->ClearSignal(signum);
729         HandlerMap::iterator i = handlers_.find(signum);
730         if (i == handlers_.end()) {
731           // This can happen if a signal is delivered to our process at around
732           // the same time as we unset our handler for it. It is not an error
733           // condition, but it's unusual enough to be worth logging.
734           LOG(LS_INFO) << "Received signal with no handler: " << signum;
735         } else {
736           // Otherwise, execute our handler.
737           (*i->second)(signum);
738         }
739       }
740     }
741   }
742 
GetDescriptor()743   virtual int GetDescriptor() {
744     return PosixSignalHandler::Instance()->GetDescriptor();
745   }
746 
IsDescriptorClosed()747   virtual bool IsDescriptorClosed() {
748     return false;
749   }
750 
SetHandler(int signum,void (* handler)(int))751   void SetHandler(int signum, void (*handler)(int)) {
752     handlers_[signum] = handler;
753   }
754 
ClearHandler(int signum)755   void ClearHandler(int signum) {
756     handlers_.erase(signum);
757   }
758 
HasHandlers()759   bool HasHandlers() {
760     return !handlers_.empty();
761   }
762 
763  private:
764   typedef std::map<int, void (*)(int)> HandlerMap;
765 
766   HandlerMap handlers_;
767   // Our owner.
768   PhysicalSocketServer *owner_;
769 };
770 
771 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
772  public:
SocketDispatcher(PhysicalSocketServer * ss)773   explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) {
774   }
SocketDispatcher(SOCKET s,PhysicalSocketServer * ss)775   SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) {
776   }
777 
~SocketDispatcher()778   virtual ~SocketDispatcher() {
779     Close();
780   }
781 
Initialize()782   bool Initialize() {
783     ss_->Add(this);
784     fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
785     return true;
786   }
787 
Create(int type)788   virtual bool Create(int type) {
789     // Change the socket to be non-blocking.
790     if (!PhysicalSocket::Create(type))
791       return false;
792 
793     return Initialize();
794   }
795 
GetDescriptor()796   virtual int GetDescriptor() {
797     return s_;
798   }
799 
IsDescriptorClosed()800   virtual bool IsDescriptorClosed() {
801     // We don't have a reliable way of distinguishing end-of-stream
802     // from readability.  So test on each readable call.  Is this
803     // inefficient?  Probably.
804     char ch;
805     ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
806     if (res > 0) {
807       // Data available, so not closed.
808       return false;
809     } else if (res == 0) {
810       // EOF, so closed.
811       return true;
812     } else {  // error
813       switch (errno) {
814         // Returned if we've already closed s_.
815         case EBADF:
816         // Returned during ungraceful peer shutdown.
817         case ECONNRESET:
818           return true;
819         default:
820           // Assume that all other errors are just blocking errors, meaning the
821           // connection is still good but we just can't read from it right now.
822           // This should only happen when connecting (and at most once), because
823           // in all other cases this function is only called if the file
824           // descriptor is already known to be in the readable state. However,
825           // it's not necessary a problem if we spuriously interpret a
826           // "connection lost"-type error as a blocking error, because typically
827           // the next recv() will get EOF, so we'll still eventually notice that
828           // the socket is closed.
829           LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
830           return false;
831       }
832     }
833   }
834 
GetRequestedEvents()835   virtual uint32 GetRequestedEvents() {
836     return enabled_events_;
837   }
838 
OnPreEvent(uint32 ff)839   virtual void OnPreEvent(uint32 ff) {
840     if ((ff & DE_CONNECT) != 0)
841       state_ = CS_CONNECTED;
842     if ((ff & DE_CLOSE) != 0)
843       state_ = CS_CLOSED;
844   }
845 
OnEvent(uint32 ff,int err)846   virtual void OnEvent(uint32 ff, int err) {
847     if ((ff & DE_READ) != 0) {
848       enabled_events_ &= ~DE_READ;
849       SignalReadEvent(this);
850     }
851     if ((ff & DE_WRITE) != 0) {
852       enabled_events_ &= ~DE_WRITE;
853       SignalWriteEvent(this);
854     }
855     if ((ff & DE_CONNECT) != 0) {
856       enabled_events_ &= ~DE_CONNECT;
857       SignalConnectEvent(this);
858     }
859     if ((ff & DE_ACCEPT) != 0) {
860       enabled_events_ &= ~DE_ACCEPT;
861       SignalReadEvent(this);
862     }
863     if ((ff & DE_CLOSE) != 0) {
864       // The socket is now dead to us, so stop checking it.
865       enabled_events_ = 0;
866       SignalCloseEvent(this, err);
867     }
868   }
869 
Close()870   virtual int Close() {
871     if (s_ == INVALID_SOCKET)
872       return 0;
873 
874     ss_->Remove(this);
875     return PhysicalSocket::Close();
876   }
877 };
878 
879 class FileDispatcher: public Dispatcher, public AsyncFile {
880  public:
FileDispatcher(int fd,PhysicalSocketServer * ss)881   FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) {
882     set_readable(true);
883 
884     ss_->Add(this);
885 
886     fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK);
887   }
888 
~FileDispatcher()889   virtual ~FileDispatcher() {
890     ss_->Remove(this);
891   }
892 
socketserver()893   SocketServer* socketserver() { return ss_; }
894 
GetDescriptor()895   virtual int GetDescriptor() {
896     return fd_;
897   }
898 
IsDescriptorClosed()899   virtual bool IsDescriptorClosed() {
900     return false;
901   }
902 
GetRequestedEvents()903   virtual uint32 GetRequestedEvents() {
904     return flags_;
905   }
906 
OnPreEvent(uint32 ff)907   virtual void OnPreEvent(uint32 ff) {
908   }
909 
OnEvent(uint32 ff,int err)910   virtual void OnEvent(uint32 ff, int err) {
911     if ((ff & DE_READ) != 0)
912       SignalReadEvent(this);
913     if ((ff & DE_WRITE) != 0)
914       SignalWriteEvent(this);
915     if ((ff & DE_CLOSE) != 0)
916       SignalCloseEvent(this, err);
917   }
918 
readable()919   virtual bool readable() {
920     return (flags_ & DE_READ) != 0;
921   }
922 
set_readable(bool value)923   virtual void set_readable(bool value) {
924     flags_ = value ? (flags_ | DE_READ) : (flags_ & ~DE_READ);
925   }
926 
writable()927   virtual bool writable() {
928     return (flags_ & DE_WRITE) != 0;
929   }
930 
set_writable(bool value)931   virtual void set_writable(bool value) {
932     flags_ = value ? (flags_ | DE_WRITE) : (flags_ & ~DE_WRITE);
933   }
934 
935  private:
936   PhysicalSocketServer* ss_;
937   int fd_;
938   int flags_;
939 };
940 
CreateFile(int fd)941 AsyncFile* PhysicalSocketServer::CreateFile(int fd) {
942   return new FileDispatcher(fd, this);
943 }
944 
945 #endif // POSIX
946 
947 #ifdef WIN32
FlagsToEvents(uint32 events)948 static uint32 FlagsToEvents(uint32 events) {
949   uint32 ffFD = FD_CLOSE;
950   if (events & DE_READ)
951     ffFD |= FD_READ;
952   if (events & DE_WRITE)
953     ffFD |= FD_WRITE;
954   if (events & DE_CONNECT)
955     ffFD |= FD_CONNECT;
956   if (events & DE_ACCEPT)
957     ffFD |= FD_ACCEPT;
958   return ffFD;
959 }
960 
961 class EventDispatcher : public Dispatcher {
962  public:
EventDispatcher(PhysicalSocketServer * ss)963   EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) {
964     hev_ = WSACreateEvent();
965     if (hev_) {
966       ss_->Add(this);
967     }
968   }
969 
~EventDispatcher()970   ~EventDispatcher() {
971     if (hev_ != NULL) {
972       ss_->Remove(this);
973       WSACloseEvent(hev_);
974       hev_ = NULL;
975     }
976   }
977 
Signal()978   virtual void Signal() {
979     if (hev_ != NULL)
980       WSASetEvent(hev_);
981   }
982 
GetRequestedEvents()983   virtual uint32 GetRequestedEvents() {
984     return 0;
985   }
986 
OnPreEvent(uint32 ff)987   virtual void OnPreEvent(uint32 ff) {
988     WSAResetEvent(hev_);
989   }
990 
OnEvent(uint32 ff,int err)991   virtual void OnEvent(uint32 ff, int err) {
992   }
993 
GetWSAEvent()994   virtual WSAEVENT GetWSAEvent() {
995     return hev_;
996   }
997 
GetSocket()998   virtual SOCKET GetSocket() {
999     return INVALID_SOCKET;
1000   }
1001 
CheckSignalClose()1002   virtual bool CheckSignalClose() { return false; }
1003 
1004 private:
1005   PhysicalSocketServer* ss_;
1006   WSAEVENT hev_;
1007 };
1008 
1009 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
1010  public:
1011   static int next_id_;
1012   int id_;
1013   bool signal_close_;
1014   int signal_err_;
1015 
SocketDispatcher(PhysicalSocketServer * ss)1016   SocketDispatcher(PhysicalSocketServer* ss)
1017       : PhysicalSocket(ss),
1018         id_(0),
1019         signal_close_(false) {
1020   }
1021 
SocketDispatcher(SOCKET s,PhysicalSocketServer * ss)1022   SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
1023       : PhysicalSocket(ss, s),
1024         id_(0),
1025         signal_close_(false) {
1026   }
1027 
~SocketDispatcher()1028   virtual ~SocketDispatcher() {
1029     Close();
1030   }
1031 
Initialize()1032   bool Initialize() {
1033     ASSERT(s_ != INVALID_SOCKET);
1034     // Must be a non-blocking
1035     u_long argp = 1;
1036     ioctlsocket(s_, FIONBIO, &argp);
1037     ss_->Add(this);
1038     return true;
1039   }
1040 
Create(int type)1041   virtual bool Create(int type) {
1042     // Create socket
1043     if (!PhysicalSocket::Create(type))
1044       return false;
1045 
1046     if (!Initialize())
1047       return false;
1048 
1049     do { id_ = ++next_id_; } while (id_ == 0);
1050     return true;
1051   }
1052 
Close()1053   virtual int Close() {
1054     if (s_ == INVALID_SOCKET)
1055       return 0;
1056 
1057     id_ = 0;
1058     signal_close_ = false;
1059     ss_->Remove(this);
1060     return PhysicalSocket::Close();
1061   }
1062 
GetRequestedEvents()1063   virtual uint32 GetRequestedEvents() {
1064     return enabled_events_;
1065   }
1066 
OnPreEvent(uint32 ff)1067   virtual void OnPreEvent(uint32 ff) {
1068     if ((ff & DE_CONNECT) != 0)
1069       state_ = CS_CONNECTED;
1070     // We set CS_CLOSED from CheckSignalClose.
1071   }
1072 
OnEvent(uint32 ff,int err)1073   virtual void OnEvent(uint32 ff, int err) {
1074     int cache_id = id_;
1075     if ((ff & DE_READ) != 0) {
1076       enabled_events_ &= ~DE_READ;
1077       SignalReadEvent(this);
1078     }
1079     if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
1080       enabled_events_ &= ~DE_WRITE;
1081       SignalWriteEvent(this);
1082     }
1083     if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
1084       if (ff != DE_CONNECT)
1085         LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
1086       enabled_events_ &= ~DE_CONNECT;
1087 #ifdef _DEBUG
1088       dbg_addr_ = "Connected @ ";
1089       dbg_addr_.append(GetRemoteAddress().ToString());
1090 #endif  // _DEBUG
1091       SignalConnectEvent(this);
1092     }
1093     if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
1094       enabled_events_ &= ~DE_ACCEPT;
1095       SignalReadEvent(this);
1096     }
1097     if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
1098       signal_close_ = true;
1099       signal_err_ = err;
1100     }
1101   }
1102 
GetWSAEvent()1103   virtual WSAEVENT GetWSAEvent() {
1104     return WSA_INVALID_EVENT;
1105   }
1106 
GetSocket()1107   virtual SOCKET GetSocket() {
1108     return s_;
1109   }
1110 
CheckSignalClose()1111   virtual bool CheckSignalClose() {
1112     if (!signal_close_)
1113       return false;
1114 
1115     char ch;
1116     if (recv(s_, &ch, 1, MSG_PEEK) > 0)
1117       return false;
1118 
1119     state_ = CS_CLOSED;
1120     signal_close_ = false;
1121     SignalCloseEvent(this, signal_err_);
1122     return true;
1123   }
1124 };
1125 
1126 int SocketDispatcher::next_id_ = 0;
1127 
1128 #endif  // WIN32
1129 
1130 // Sets the value of a boolean value to false when signaled.
1131 class Signaler : public EventDispatcher {
1132  public:
Signaler(PhysicalSocketServer * ss,bool * pf)1133   Signaler(PhysicalSocketServer* ss, bool* pf)
1134       : EventDispatcher(ss), pf_(pf) {
1135   }
~Signaler()1136   virtual ~Signaler() { }
1137 
OnEvent(uint32 ff,int err)1138   void OnEvent(uint32 ff, int err) {
1139     if (pf_)
1140       *pf_ = false;
1141   }
1142 
1143  private:
1144   bool *pf_;
1145 };
1146 
PhysicalSocketServer()1147 PhysicalSocketServer::PhysicalSocketServer()
1148     : fWait_(false),
1149       last_tick_tracked_(0),
1150       last_tick_dispatch_count_(0) {
1151   signal_wakeup_ = new Signaler(this, &fWait_);
1152 #ifdef WIN32
1153   socket_ev_ = WSACreateEvent();
1154 #endif
1155 }
1156 
~PhysicalSocketServer()1157 PhysicalSocketServer::~PhysicalSocketServer() {
1158 #ifdef WIN32
1159   WSACloseEvent(socket_ev_);
1160 #endif
1161 #ifdef POSIX
1162   signal_dispatcher_.reset();
1163 #endif
1164   delete signal_wakeup_;
1165   ASSERT(dispatchers_.empty());
1166 }
1167 
WakeUp()1168 void PhysicalSocketServer::WakeUp() {
1169   signal_wakeup_->Signal();
1170 }
1171 
CreateSocket(int type)1172 Socket* PhysicalSocketServer::CreateSocket(int type) {
1173   PhysicalSocket* socket = new PhysicalSocket(this);
1174   if (socket->Create(type)) {
1175     return socket;
1176   } else {
1177     delete socket;
1178     return 0;
1179   }
1180 }
1181 
CreateAsyncSocket(int type)1182 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) {
1183   SocketDispatcher* dispatcher = new SocketDispatcher(this);
1184   if (dispatcher->Create(type)) {
1185     return dispatcher;
1186   } else {
1187     delete dispatcher;
1188     return 0;
1189   }
1190 }
1191 
WrapSocket(SOCKET s)1192 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
1193   SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
1194   if (dispatcher->Initialize()) {
1195     return dispatcher;
1196   } else {
1197     delete dispatcher;
1198     return 0;
1199   }
1200 }
1201 
Add(Dispatcher * pdispatcher)1202 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1203   CritScope cs(&crit_);
1204   // Prevent duplicates. This can cause dead dispatchers to stick around.
1205   DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1206                                            dispatchers_.end(),
1207                                            pdispatcher);
1208   if (pos != dispatchers_.end())
1209     return;
1210   dispatchers_.push_back(pdispatcher);
1211 }
1212 
Remove(Dispatcher * pdispatcher)1213 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
1214   CritScope cs(&crit_);
1215   DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1216                                            dispatchers_.end(),
1217                                            pdispatcher);
1218   ASSERT(pos != dispatchers_.end());
1219   size_t index = pos - dispatchers_.begin();
1220   dispatchers_.erase(pos);
1221   for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
1222        ++it) {
1223     if (index < **it) {
1224       --**it;
1225     }
1226   }
1227 }
1228 
1229 #ifdef POSIX
Wait(int cmsWait,bool process_io)1230 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1231   // Calculate timing information
1232 
1233   struct timeval *ptvWait = NULL;
1234   struct timeval tvWait;
1235   struct timeval tvStop;
1236   if (cmsWait != kForever) {
1237     // Calculate wait timeval
1238     tvWait.tv_sec = cmsWait / 1000;
1239     tvWait.tv_usec = (cmsWait % 1000) * 1000;
1240     ptvWait = &tvWait;
1241 
1242     // Calculate when to return in a timeval
1243     gettimeofday(&tvStop, NULL);
1244     tvStop.tv_sec += tvWait.tv_sec;
1245     tvStop.tv_usec += tvWait.tv_usec;
1246     if (tvStop.tv_usec >= 1000000) {
1247       tvStop.tv_usec -= 1000000;
1248       tvStop.tv_sec += 1;
1249     }
1250   }
1251 
1252   // Zero all fd_sets. Don't need to do this inside the loop since
1253   // select() zeros the descriptors not signaled
1254 
1255   fd_set fdsRead;
1256   FD_ZERO(&fdsRead);
1257   fd_set fdsWrite;
1258   FD_ZERO(&fdsWrite);
1259 
1260   fWait_ = true;
1261 
1262   while (fWait_) {
1263     int fdmax = -1;
1264     {
1265       CritScope cr(&crit_);
1266       for (size_t i = 0; i < dispatchers_.size(); ++i) {
1267         // Query dispatchers for read and write wait state
1268         Dispatcher *pdispatcher = dispatchers_[i];
1269         ASSERT(pdispatcher);
1270         if (!process_io && (pdispatcher != signal_wakeup_))
1271           continue;
1272         int fd = pdispatcher->GetDescriptor();
1273         if (fd > fdmax)
1274           fdmax = fd;
1275 
1276         uint32 ff = pdispatcher->GetRequestedEvents();
1277         if (ff & (DE_READ | DE_ACCEPT))
1278           FD_SET(fd, &fdsRead);
1279         if (ff & (DE_WRITE | DE_CONNECT))
1280           FD_SET(fd, &fdsWrite);
1281       }
1282     }
1283 
1284     // Wait then call handlers as appropriate
1285     // < 0 means error
1286     // 0 means timeout
1287     // > 0 means count of descriptors ready
1288     int n = select(fdmax + 1, &fdsRead, &fdsWrite, NULL, ptvWait);
1289 
1290     // If error, return error.
1291     if (n < 0) {
1292       if (errno != EINTR) {
1293         LOG_E(LS_ERROR, EN, errno) << "select";
1294         return false;
1295       }
1296       // Else ignore the error and keep going. If this EINTR was for one of the
1297       // signals managed by this PhysicalSocketServer, the
1298       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1299       // iteration.
1300     } else if (n == 0) {
1301       // If timeout, return success
1302       return true;
1303     } else {
1304       // We have signaled descriptors
1305       CritScope cr(&crit_);
1306       for (size_t i = 0; i < dispatchers_.size(); ++i) {
1307         Dispatcher *pdispatcher = dispatchers_[i];
1308         int fd = pdispatcher->GetDescriptor();
1309         uint32 ff = 0;
1310         int errcode = 0;
1311 
1312         // Reap any error code, which can be signaled through reads or writes.
1313         // TODO: Should we set errcode if getsockopt fails?
1314         if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
1315           socklen_t len = sizeof(errcode);
1316           ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1317         }
1318 
1319         // Check readable descriptors. If we're waiting on an accept, signal
1320         // that. Otherwise we're waiting for data, check to see if we're
1321         // readable or really closed.
1322         // TODO: Only peek at TCP descriptors.
1323         if (FD_ISSET(fd, &fdsRead)) {
1324           FD_CLR(fd, &fdsRead);
1325           if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
1326             ff |= DE_ACCEPT;
1327           } else if (errcode || pdispatcher->IsDescriptorClosed()) {
1328             ff |= DE_CLOSE;
1329           } else {
1330             ff |= DE_READ;
1331           }
1332         }
1333 
1334         // Check writable descriptors. If we're waiting on a connect, detect
1335         // success versus failure by the reaped error code.
1336         if (FD_ISSET(fd, &fdsWrite)) {
1337           FD_CLR(fd, &fdsWrite);
1338           if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
1339             if (!errcode) {
1340               ff |= DE_CONNECT;
1341             } else {
1342               ff |= DE_CLOSE;
1343             }
1344           } else {
1345             ff |= DE_WRITE;
1346           }
1347         }
1348 
1349         // Tell the descriptor about the event.
1350         if (ff != 0) {
1351           pdispatcher->OnPreEvent(ff);
1352           pdispatcher->OnEvent(ff, errcode);
1353         }
1354       }
1355     }
1356 
1357     // Recalc the time remaining to wait. Doing it here means it doesn't get
1358     // calced twice the first time through the loop
1359 
1360     if (cmsWait != kForever) {
1361       ptvWait->tv_sec = 0;
1362       ptvWait->tv_usec = 0;
1363       struct timeval tvT;
1364       gettimeofday(&tvT, NULL);
1365       if ((tvStop.tv_sec > tvT.tv_sec)
1366           || ((tvStop.tv_sec == tvT.tv_sec)
1367               && (tvStop.tv_usec > tvT.tv_usec))) {
1368         ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;
1369         ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;
1370         if (ptvWait->tv_usec < 0) {
1371           ASSERT(ptvWait->tv_sec > 0);
1372           ptvWait->tv_usec += 1000000;
1373           ptvWait->tv_sec -= 1;
1374         }
1375       }
1376     }
1377   }
1378 
1379   return true;
1380 }
1381 
GlobalSignalHandler(int signum)1382 static void GlobalSignalHandler(int signum) {
1383   PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
1384 }
1385 
SetPosixSignalHandler(int signum,void (* handler)(int))1386 bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
1387                                                  void (*handler)(int)) {
1388   // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
1389   // otherwise set one.
1390   if (handler == SIG_IGN || handler == SIG_DFL) {
1391     if (!InstallSignal(signum, handler)) {
1392       return false;
1393     }
1394     if (signal_dispatcher_.get()) {
1395       signal_dispatcher_->ClearHandler(signum);
1396       if (!signal_dispatcher_->HasHandlers()) {
1397         signal_dispatcher_.reset();
1398       }
1399     }
1400   } else {
1401     if (!signal_dispatcher_.get()) {
1402       signal_dispatcher_.reset(new PosixSignalDispatcher(this));
1403     }
1404     signal_dispatcher_->SetHandler(signum, handler);
1405     if (!InstallSignal(signum, &GlobalSignalHandler)) {
1406       return false;
1407     }
1408   }
1409   return true;
1410 }
1411 
InstallSignal(int signum,void (* handler)(int))1412 bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) {
1413   struct sigaction act;
1414   // It doesn't really matter what we set this mask to.
1415   if (sigemptyset(&act.sa_mask) != 0) {
1416     LOG_ERR(LS_ERROR) << "Couldn't set mask";
1417     return false;
1418   }
1419   act.sa_handler = handler;
1420   // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it
1421   // and it's a nuisance. Though some syscalls still return EINTR and there's no
1422   // real standard for which ones. :(
1423   act.sa_flags = SA_RESTART;
1424   if (sigaction(signum, &act, NULL) != 0) {
1425     LOG_ERR(LS_ERROR) << "Couldn't set sigaction";
1426     return false;
1427   }
1428   return true;
1429 }
1430 #endif  // POSIX
1431 
1432 #ifdef WIN32
Wait(int cmsWait,bool process_io)1433 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1434   int cmsTotal = cmsWait;
1435   int cmsElapsed = 0;
1436   uint32 msStart = Time();
1437 
1438 #if LOGGING
1439   if (last_tick_dispatch_count_ == 0) {
1440     last_tick_tracked_ = msStart;
1441   }
1442 #endif
1443 
1444   fWait_ = true;
1445   while (fWait_) {
1446     std::vector<WSAEVENT> events;
1447     std::vector<Dispatcher *> event_owners;
1448 
1449     events.push_back(socket_ev_);
1450 
1451     {
1452       CritScope cr(&crit_);
1453       size_t i = 0;
1454       iterators_.push_back(&i);
1455       // Don't track dispatchers_.size(), because we want to pick up any new
1456       // dispatchers that were added while processing the loop.
1457       while (i < dispatchers_.size()) {
1458         Dispatcher* disp = dispatchers_[i++];
1459         if (!process_io && (disp != signal_wakeup_))
1460           continue;
1461         SOCKET s = disp->GetSocket();
1462         if (disp->CheckSignalClose()) {
1463           // We just signalled close, don't poll this socket
1464         } else if (s != INVALID_SOCKET) {
1465           WSAEventSelect(s,
1466                          events[0],
1467                          FlagsToEvents(disp->GetRequestedEvents()));
1468         } else {
1469           events.push_back(disp->GetWSAEvent());
1470           event_owners.push_back(disp);
1471         }
1472       }
1473       ASSERT(iterators_.back() == &i);
1474       iterators_.pop_back();
1475     }
1476 
1477     // Which is shorter, the delay wait or the asked wait?
1478 
1479     int cmsNext;
1480     if (cmsWait == kForever) {
1481       cmsNext = cmsWait;
1482     } else {
1483       cmsNext = _max(0, cmsTotal - cmsElapsed);
1484     }
1485 
1486     // Wait for one of the events to signal
1487     DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()),
1488                                         &events[0],
1489                                         false,
1490                                         cmsNext,
1491                                         false);
1492 
1493 #if 0  // LOGGING
1494     // we track this information purely for logging purposes.
1495     last_tick_dispatch_count_++;
1496     if (last_tick_dispatch_count_ >= 1000) {
1497       int32 elapsed = TimeSince(last_tick_tracked_);
1498       LOG(INFO) << "PhysicalSocketServer took " << elapsed
1499                 << "ms for 1000 events";
1500 
1501       // If we get more than 1000 events in a second, we are spinning badly
1502       // (normally it should take about 8-20 seconds).
1503       ASSERT(elapsed > 1000);
1504 
1505       last_tick_tracked_ = Time();
1506       last_tick_dispatch_count_ = 0;
1507     }
1508 #endif
1509 
1510     if (dw == WSA_WAIT_FAILED) {
1511       // Failed?
1512       // TODO: need a better strategy than this!
1513       int error = WSAGetLastError();
1514       ASSERT(false);
1515       return false;
1516     } else if (dw == WSA_WAIT_TIMEOUT) {
1517       // Timeout?
1518       return true;
1519     } else {
1520       // Figure out which one it is and call it
1521       CritScope cr(&crit_);
1522       int index = dw - WSA_WAIT_EVENT_0;
1523       if (index > 0) {
1524         --index; // The first event is the socket event
1525         event_owners[index]->OnPreEvent(0);
1526         event_owners[index]->OnEvent(0, 0);
1527       } else if (process_io) {
1528         size_t i = 0, end = dispatchers_.size();
1529         iterators_.push_back(&i);
1530         iterators_.push_back(&end);  // Don't iterate over new dispatchers.
1531         while (i < end) {
1532           Dispatcher* disp = dispatchers_[i++];
1533           SOCKET s = disp->GetSocket();
1534           if (s == INVALID_SOCKET)
1535             continue;
1536 
1537           WSANETWORKEVENTS wsaEvents;
1538           int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
1539           if (err == 0) {
1540 
1541 #if LOGGING
1542             {
1543               if ((wsaEvents.lNetworkEvents & FD_READ) &&
1544                   wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
1545                 LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error "
1546                              << wsaEvents.iErrorCode[FD_READ_BIT];
1547               }
1548               if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
1549                   wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
1550                 LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error "
1551                              << wsaEvents.iErrorCode[FD_WRITE_BIT];
1552               }
1553               if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
1554                   wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
1555                 LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error "
1556                              << wsaEvents.iErrorCode[FD_CONNECT_BIT];
1557               }
1558               if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
1559                   wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
1560                 LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error "
1561                              << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
1562               }
1563               if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
1564                   wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
1565                 LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error "
1566                              << wsaEvents.iErrorCode[FD_CLOSE_BIT];
1567               }
1568             }
1569 #endif
1570             uint32 ff = 0;
1571             int errcode = 0;
1572             if (wsaEvents.lNetworkEvents & FD_READ)
1573               ff |= DE_READ;
1574             if (wsaEvents.lNetworkEvents & FD_WRITE)
1575               ff |= DE_WRITE;
1576             if (wsaEvents.lNetworkEvents & FD_CONNECT) {
1577               if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
1578                 ff |= DE_CONNECT;
1579               } else {
1580                 ff |= DE_CLOSE;
1581                 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
1582               }
1583             }
1584             if (wsaEvents.lNetworkEvents & FD_ACCEPT)
1585               ff |= DE_ACCEPT;
1586             if (wsaEvents.lNetworkEvents & FD_CLOSE) {
1587               ff |= DE_CLOSE;
1588               errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
1589             }
1590             if (ff != 0) {
1591               disp->OnPreEvent(ff);
1592               disp->OnEvent(ff, errcode);
1593             }
1594           }
1595         }
1596         ASSERT(iterators_.back() == &end);
1597         iterators_.pop_back();
1598         ASSERT(iterators_.back() == &i);
1599         iterators_.pop_back();
1600       }
1601 
1602       // Reset the network event until new activity occurs
1603       WSAResetEvent(socket_ev_);
1604     }
1605 
1606     // Break?
1607     if (!fWait_)
1608       break;
1609     cmsElapsed = TimeSince(msStart);
1610     if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
1611        break;
1612     }
1613   }
1614 
1615   // Done
1616   return true;
1617 }
1618 #endif  // WIN32
1619 
1620 }  // namespace talk_base
1621