• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #if defined(_MSC_VER) && _MSC_VER < 1300
12 #pragma warning(disable:4786)
13 #endif
14 
15 #include <assert.h>
16 
17 #if defined(WEBRTC_POSIX)
18 #include <string.h>
19 #include <errno.h>
20 #include <fcntl.h>
21 #include <sys/time.h>
22 #include <sys/select.h>
23 #include <unistd.h>
24 #include <signal.h>
25 #endif
26 
27 #if defined(WEBRTC_WIN)
28 #define WIN32_LEAN_AND_MEAN
29 #include <windows.h>
30 #include <winsock2.h>
31 #include <ws2tcpip.h>
32 #undef SetPort
33 #endif
34 
35 #include <algorithm>
36 #include <map>
37 
38 #include "webrtc/base/basictypes.h"
39 #include "webrtc/base/byteorder.h"
40 #include "webrtc/base/common.h"
41 #include "webrtc/base/logging.h"
42 #include "webrtc/base/nethelpers.h"
43 #include "webrtc/base/physicalsocketserver.h"
44 #include "webrtc/base/timeutils.h"
45 #include "webrtc/base/winping.h"
46 #include "webrtc/base/win32socketinit.h"
47 
48 // stm: this will tell us if we are on OSX
49 #ifdef HAVE_CONFIG_H
50 #include "config.h"
51 #endif
52 
53 #if defined(WEBRTC_POSIX)
54 #include <netinet/tcp.h>  // for TCP_NODELAY
55 #define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h
56 typedef void* SockOptArg;
57 #endif  // WEBRTC_POSIX
58 
59 #if defined(WEBRTC_WIN)
60 typedef char* SockOptArg;
61 #endif
62 
63 namespace rtc {
64 
65 #if defined(WEBRTC_WIN)
66 // Standard MTUs, from RFC 1191
67 const uint16 PACKET_MAXIMUMS[] = {
68   65535,    // Theoretical maximum, Hyperchannel
69   32000,    // Nothing
70   17914,    // 16Mb IBM Token Ring
71   8166,     // IEEE 802.4
72   //4464,   // IEEE 802.5 (4Mb max)
73   4352,     // FDDI
74   //2048,   // Wideband Network
75   2002,     // IEEE 802.5 (4Mb recommended)
76   //1536,   // Expermental Ethernet Networks
77   //1500,   // Ethernet, Point-to-Point (default)
78   1492,     // IEEE 802.3
79   1006,     // SLIP, ARPANET
80   //576,    // X.25 Networks
81   //544,    // DEC IP Portal
82   //512,    // NETBIOS
83   508,      // IEEE 802/Source-Rt Bridge, ARCNET
84   296,      // Point-to-Point (low delay)
85   68,       // Official minimum
86   0,        // End of list marker
87 };
88 
89 static const int IP_HEADER_SIZE = 20u;
90 static const int IPV6_HEADER_SIZE = 40u;
91 static const int ICMP_HEADER_SIZE = 8u;
92 static const int ICMP_PING_TIMEOUT_MILLIS = 10000u;
93 #endif
94 
95 class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
96  public:
PhysicalSocket(PhysicalSocketServer * ss,SOCKET s=INVALID_SOCKET)97   PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET)
98     : ss_(ss), s_(s), enabled_events_(0), error_(0),
99       state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
100       resolver_(NULL) {
101 #if defined(WEBRTC_WIN)
102     // EnsureWinsockInit() ensures that winsock is initialized. The default
103     // version of this function doesn't do anything because winsock is
104     // initialized by constructor of a static object. If neccessary libjingle
105     // users can link it with a different version of this function by replacing
106     // win32socketinit.cc. See win32socketinit.cc for more details.
107     EnsureWinsockInit();
108 #endif
109     if (s_ != INVALID_SOCKET) {
110       enabled_events_ = DE_READ | DE_WRITE;
111 
112       int type = SOCK_STREAM;
113       socklen_t len = sizeof(type);
114       VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len));
115       udp_ = (SOCK_DGRAM == type);
116     }
117   }
118 
~PhysicalSocket()119   virtual ~PhysicalSocket() {
120     Close();
121   }
122 
123   // Creates the underlying OS socket (same as the "socket" function).
Create(int family,int type)124   virtual bool Create(int family, int type) {
125     Close();
126     s_ = ::socket(family, type, 0);
127     udp_ = (SOCK_DGRAM == type);
128     UpdateLastError();
129     if (udp_)
130       enabled_events_ = DE_READ | DE_WRITE;
131     return s_ != INVALID_SOCKET;
132   }
133 
GetLocalAddress() const134   SocketAddress GetLocalAddress() const {
135     sockaddr_storage addr_storage = {0};
136     socklen_t addrlen = sizeof(addr_storage);
137     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
138     int result = ::getsockname(s_, addr, &addrlen);
139     SocketAddress address;
140     if (result >= 0) {
141       SocketAddressFromSockAddrStorage(addr_storage, &address);
142     } else {
143       LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
144                       << s_;
145     }
146     return address;
147   }
148 
GetRemoteAddress() const149   SocketAddress GetRemoteAddress() const {
150     sockaddr_storage addr_storage = {0};
151     socklen_t addrlen = sizeof(addr_storage);
152     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
153     int result = ::getpeername(s_, addr, &addrlen);
154     SocketAddress address;
155     if (result >= 0) {
156       SocketAddressFromSockAddrStorage(addr_storage, &address);
157     } else {
158       LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket="
159                       << s_;
160     }
161     return address;
162   }
163 
Bind(const SocketAddress & bind_addr)164   int Bind(const SocketAddress& bind_addr) {
165     sockaddr_storage addr_storage;
166     size_t len = bind_addr.ToSockAddrStorage(&addr_storage);
167     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
168     int err = ::bind(s_, addr, static_cast<int>(len));
169     UpdateLastError();
170 #ifdef _DEBUG
171     if (0 == err) {
172       dbg_addr_ = "Bound @ ";
173       dbg_addr_.append(GetLocalAddress().ToString());
174     }
175 #endif  // _DEBUG
176     return err;
177   }
178 
Connect(const SocketAddress & addr)179   int Connect(const SocketAddress& addr) {
180     // TODO: Implicit creation is required to reconnect...
181     // ...but should we make it more explicit?
182     if (state_ != CS_CLOSED) {
183       SetError(EALREADY);
184       return SOCKET_ERROR;
185     }
186     if (addr.IsUnresolved()) {
187       LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
188       resolver_ = new AsyncResolver();
189       resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult);
190       resolver_->Start(addr);
191       state_ = CS_CONNECTING;
192       return 0;
193     }
194 
195     return DoConnect(addr);
196   }
197 
DoConnect(const SocketAddress & connect_addr)198   int DoConnect(const SocketAddress& connect_addr) {
199     if ((s_ == INVALID_SOCKET) &&
200         !Create(connect_addr.family(), SOCK_STREAM)) {
201       return SOCKET_ERROR;
202     }
203     sockaddr_storage addr_storage;
204     size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
205     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
206     int err = ::connect(s_, addr, static_cast<int>(len));
207     UpdateLastError();
208     if (err == 0) {
209       state_ = CS_CONNECTED;
210     } else if (IsBlockingError(GetError())) {
211       state_ = CS_CONNECTING;
212       enabled_events_ |= DE_CONNECT;
213     } else {
214       return SOCKET_ERROR;
215     }
216 
217     enabled_events_ |= DE_READ | DE_WRITE;
218     return 0;
219   }
220 
GetError() const221   int GetError() const {
222     CritScope cs(&crit_);
223     return error_;
224   }
225 
SetError(int error)226   void SetError(int error) {
227     CritScope cs(&crit_);
228     error_ = error;
229   }
230 
GetState() const231   ConnState GetState() const {
232     return state_;
233   }
234 
GetOption(Option opt,int * value)235   int GetOption(Option opt, int* value) {
236     int slevel;
237     int sopt;
238     if (TranslateOption(opt, &slevel, &sopt) == -1)
239       return -1;
240     socklen_t optlen = sizeof(*value);
241     int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
242     if (ret != -1 && opt == OPT_DONTFRAGMENT) {
243 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
244       *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
245 #endif
246     }
247     return ret;
248   }
249 
SetOption(Option opt,int value)250   int SetOption(Option opt, int value) {
251     int slevel;
252     int sopt;
253     if (TranslateOption(opt, &slevel, &sopt) == -1)
254       return -1;
255     if (opt == OPT_DONTFRAGMENT) {
256 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
257       value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
258 #endif
259     }
260     return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
261   }
262 
Send(const void * pv,size_t cb)263   int Send(const void *pv, size_t cb) {
264     int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb,
265 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
266         // Suppress SIGPIPE. Without this, attempting to send on a socket whose
267         // other end is closed will result in a SIGPIPE signal being raised to
268         // our process, which by default will terminate the process, which we
269         // don't want. By specifying this flag, we'll just get the error EPIPE
270         // instead and can handle the error gracefully.
271         MSG_NOSIGNAL
272 #else
273         0
274 #endif
275         );
276     UpdateLastError();
277     MaybeRemapSendError();
278     // We have seen minidumps where this may be false.
279     ASSERT(sent <= static_cast<int>(cb));
280     if ((sent < 0) && IsBlockingError(GetError())) {
281       enabled_events_ |= DE_WRITE;
282     }
283     return sent;
284   }
285 
SendTo(const void * buffer,size_t length,const SocketAddress & addr)286   int SendTo(const void* buffer, size_t length, const SocketAddress& addr) {
287     sockaddr_storage saddr;
288     size_t len = addr.ToSockAddrStorage(&saddr);
289     int sent = ::sendto(
290         s_, static_cast<const char *>(buffer), static_cast<int>(length),
291 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
292         // Suppress SIGPIPE. See above for explanation.
293         MSG_NOSIGNAL,
294 #else
295         0,
296 #endif
297         reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
298     UpdateLastError();
299     MaybeRemapSendError();
300     // We have seen minidumps where this may be false.
301     ASSERT(sent <= static_cast<int>(length));
302     if ((sent < 0) && IsBlockingError(GetError())) {
303       enabled_events_ |= DE_WRITE;
304     }
305     return sent;
306   }
307 
Recv(void * buffer,size_t length)308   int Recv(void* buffer, size_t length) {
309     int received = ::recv(s_, static_cast<char*>(buffer),
310                           static_cast<int>(length), 0);
311     if ((received == 0) && (length != 0)) {
312       // Note: on graceful shutdown, recv can return 0.  In this case, we
313       // pretend it is blocking, and then signal close, so that simplifying
314       // assumptions can be made about Recv.
315       LOG(LS_WARNING) << "EOF from socket; deferring close event";
316       // Must turn this back on so that the select() loop will notice the close
317       // event.
318       enabled_events_ |= DE_READ;
319       SetError(EWOULDBLOCK);
320       return SOCKET_ERROR;
321     }
322     UpdateLastError();
323     int error = GetError();
324     bool success = (received >= 0) || IsBlockingError(error);
325     if (udp_ || success) {
326       enabled_events_ |= DE_READ;
327     }
328     if (!success) {
329       LOG_F(LS_VERBOSE) << "Error = " << error;
330     }
331     return received;
332   }
333 
RecvFrom(void * buffer,size_t length,SocketAddress * out_addr)334   int RecvFrom(void* buffer, size_t length, SocketAddress *out_addr) {
335     sockaddr_storage addr_storage;
336     socklen_t addr_len = sizeof(addr_storage);
337     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
338     int received = ::recvfrom(s_, static_cast<char*>(buffer),
339                               static_cast<int>(length), 0, addr, &addr_len);
340     UpdateLastError();
341     if ((received >= 0) && (out_addr != NULL))
342       SocketAddressFromSockAddrStorage(addr_storage, out_addr);
343     int error = GetError();
344     bool success = (received >= 0) || IsBlockingError(error);
345     if (udp_ || success) {
346       enabled_events_ |= DE_READ;
347     }
348     if (!success) {
349       LOG_F(LS_VERBOSE) << "Error = " << error;
350     }
351     return received;
352   }
353 
Listen(int backlog)354   int Listen(int backlog) {
355     int err = ::listen(s_, backlog);
356     UpdateLastError();
357     if (err == 0) {
358       state_ = CS_CONNECTING;
359       enabled_events_ |= DE_ACCEPT;
360 #ifdef _DEBUG
361       dbg_addr_ = "Listening @ ";
362       dbg_addr_.append(GetLocalAddress().ToString());
363 #endif  // _DEBUG
364     }
365     return err;
366   }
367 
Accept(SocketAddress * out_addr)368   AsyncSocket* Accept(SocketAddress *out_addr) {
369     sockaddr_storage addr_storage;
370     socklen_t addr_len = sizeof(addr_storage);
371     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
372     SOCKET s = ::accept(s_, addr, &addr_len);
373     UpdateLastError();
374     if (s == INVALID_SOCKET)
375       return NULL;
376     enabled_events_ |= DE_ACCEPT;
377     if (out_addr != NULL)
378       SocketAddressFromSockAddrStorage(addr_storage, out_addr);
379     return ss_->WrapSocket(s);
380   }
381 
Close()382   int Close() {
383     if (s_ == INVALID_SOCKET)
384       return 0;
385     int err = ::closesocket(s_);
386     UpdateLastError();
387     s_ = INVALID_SOCKET;
388     state_ = CS_CLOSED;
389     enabled_events_ = 0;
390     if (resolver_) {
391       resolver_->Destroy(false);
392       resolver_ = NULL;
393     }
394     return err;
395   }
396 
EstimateMTU(uint16 * mtu)397   int EstimateMTU(uint16* mtu) {
398     SocketAddress addr = GetRemoteAddress();
399     if (addr.IsAny()) {
400       SetError(ENOTCONN);
401       return -1;
402     }
403 
404 #if defined(WEBRTC_WIN)
405     // Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
406     WinPing ping;
407     if (!ping.IsValid()) {
408       SetError(EINVAL);  // can't think of a better error ID
409       return -1;
410     }
411     int header_size = ICMP_HEADER_SIZE;
412     if (addr.family() == AF_INET6) {
413       header_size += IPV6_HEADER_SIZE;
414     } else if (addr.family() == AF_INET) {
415       header_size += IP_HEADER_SIZE;
416     }
417 
418     for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) {
419       int32 size = PACKET_MAXIMUMS[level] - header_size;
420       WinPing::PingResult result = ping.Ping(addr.ipaddr(), size,
421                                              ICMP_PING_TIMEOUT_MILLIS,
422                                              1, false);
423       if (result == WinPing::PING_FAIL) {
424         SetError(EINVAL);  // can't think of a better error ID
425         return -1;
426       } else if (result != WinPing::PING_TOO_LARGE) {
427         *mtu = PACKET_MAXIMUMS[level];
428         return 0;
429       }
430     }
431 
432     ASSERT(false);
433     return -1;
434 #elif defined(WEBRTC_MAC)
435     // No simple way to do this on Mac OS X.
436     // SIOCGIFMTU would work if we knew which interface would be used, but
437     // figuring that out is pretty complicated. For now we'll return an error
438     // and let the caller pick a default MTU.
439     SetError(EINVAL);
440     return -1;
441 #elif defined(WEBRTC_LINUX)
442     // Gets the path MTU.
443     int value;
444     socklen_t vlen = sizeof(value);
445     int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen);
446     if (err < 0) {
447       UpdateLastError();
448       return err;
449     }
450 
451     ASSERT((0 <= value) && (value <= 65536));
452     *mtu = value;
453     return 0;
454 #elif defined(__native_client__)
455     // Most socket operations, including this, will fail in NaCl's sandbox.
456     error_ = EACCES;
457     return -1;
458 #endif
459   }
460 
socketserver()461   SocketServer* socketserver() { return ss_; }
462 
463  protected:
OnResolveResult(AsyncResolverInterface * resolver)464   void OnResolveResult(AsyncResolverInterface* resolver) {
465     if (resolver != resolver_) {
466       return;
467     }
468 
469     int error = resolver_->GetError();
470     if (error == 0) {
471       error = DoConnect(resolver_->address());
472     } else {
473       Close();
474     }
475 
476     if (error) {
477       SetError(error);
478       SignalCloseEvent(this, error);
479     }
480   }
481 
UpdateLastError()482   void UpdateLastError() {
483     SetError(LAST_SYSTEM_ERROR);
484   }
485 
MaybeRemapSendError()486   void MaybeRemapSendError() {
487 #if defined(WEBRTC_MAC)
488     // https://developer.apple.com/library/mac/documentation/Darwin/
489     // Reference/ManPages/man2/sendto.2.html
490     // ENOBUFS - The output queue for a network interface is full.
491     // This generally indicates that the interface has stopped sending,
492     // but may be caused by transient congestion.
493     if (GetError() == ENOBUFS) {
494       SetError(EWOULDBLOCK);
495     }
496 #endif
497   }
498 
TranslateOption(Option opt,int * slevel,int * sopt)499   static int TranslateOption(Option opt, int* slevel, int* sopt) {
500     switch (opt) {
501       case OPT_DONTFRAGMENT:
502 #if defined(WEBRTC_WIN)
503         *slevel = IPPROTO_IP;
504         *sopt = IP_DONTFRAGMENT;
505         break;
506 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__)
507         LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
508         return -1;
509 #elif defined(WEBRTC_POSIX)
510         *slevel = IPPROTO_IP;
511         *sopt = IP_MTU_DISCOVER;
512         break;
513 #endif
514       case OPT_RCVBUF:
515         *slevel = SOL_SOCKET;
516         *sopt = SO_RCVBUF;
517         break;
518       case OPT_SNDBUF:
519         *slevel = SOL_SOCKET;
520         *sopt = SO_SNDBUF;
521         break;
522       case OPT_NODELAY:
523         *slevel = IPPROTO_TCP;
524         *sopt = TCP_NODELAY;
525         break;
526       case OPT_DSCP:
527         LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
528         return -1;
529       case OPT_RTP_SENDTIME_EXTN_ID:
530         return -1;  // No logging is necessary as this not a OS socket option.
531       default:
532         ASSERT(false);
533         return -1;
534     }
535     return 0;
536   }
537 
538   PhysicalSocketServer* ss_;
539   SOCKET s_;
540   uint8 enabled_events_;
541   bool udp_;
542   int error_;
543   // Protects |error_| that is accessed from different threads.
544   mutable CriticalSection crit_;
545   ConnState state_;
546   AsyncResolver* resolver_;
547 
548 #ifdef _DEBUG
549   std::string dbg_addr_;
550 #endif  // _DEBUG;
551 };
552 
553 #if defined(WEBRTC_POSIX)
554 class EventDispatcher : public Dispatcher {
555  public:
EventDispatcher(PhysicalSocketServer * ss)556   EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
557     if (pipe(afd_) < 0)
558       LOG(LERROR) << "pipe failed";
559     ss_->Add(this);
560   }
561 
~EventDispatcher()562   virtual ~EventDispatcher() {
563     ss_->Remove(this);
564     close(afd_[0]);
565     close(afd_[1]);
566   }
567 
Signal()568   virtual void Signal() {
569     CritScope cs(&crit_);
570     if (!fSignaled_) {
571       const uint8 b[1] = { 0 };
572       if (VERIFY(1 == write(afd_[1], b, sizeof(b)))) {
573         fSignaled_ = true;
574       }
575     }
576   }
577 
GetRequestedEvents()578   virtual uint32 GetRequestedEvents() {
579     return DE_READ;
580   }
581 
OnPreEvent(uint32 ff)582   virtual void OnPreEvent(uint32 ff) {
583     // It is not possible to perfectly emulate an auto-resetting event with
584     // pipes.  This simulates it by resetting before the event is handled.
585 
586     CritScope cs(&crit_);
587     if (fSignaled_) {
588       uint8 b[4];  // Allow for reading more than 1 byte, but expect 1.
589       VERIFY(1 == read(afd_[0], b, sizeof(b)));
590       fSignaled_ = false;
591     }
592   }
593 
OnEvent(uint32 ff,int err)594   virtual void OnEvent(uint32 ff, int err) {
595     ASSERT(false);
596   }
597 
GetDescriptor()598   virtual int GetDescriptor() {
599     return afd_[0];
600   }
601 
IsDescriptorClosed()602   virtual bool IsDescriptorClosed() {
603     return false;
604   }
605 
606  private:
607   PhysicalSocketServer *ss_;
608   int afd_[2];
609   bool fSignaled_;
610   CriticalSection crit_;
611 };
612 
613 // These two classes use the self-pipe trick to deliver POSIX signals to our
614 // select loop. This is the only safe, reliable, cross-platform way to do
615 // non-trivial things with a POSIX signal in an event-driven program (until
616 // proper pselect() implementations become ubiquitous).
617 
618 class PosixSignalHandler {
619  public:
620   // POSIX only specifies 32 signals, but in principle the system might have
621   // more and the programmer might choose to use them, so we size our array
622   // for 128.
623   static const int kNumPosixSignals = 128;
624 
625   // There is just a single global instance. (Signal handlers do not get any
626   // sort of user-defined void * parameter, so they can't access anything that
627   // isn't global.)
Instance()628   static PosixSignalHandler* Instance() {
629     LIBJINGLE_DEFINE_STATIC_LOCAL(PosixSignalHandler, instance, ());
630     return &instance;
631   }
632 
633   // Returns true if the given signal number is set.
IsSignalSet(int signum) const634   bool IsSignalSet(int signum) const {
635     ASSERT(signum < ARRAY_SIZE(received_signal_));
636     if (signum < ARRAY_SIZE(received_signal_)) {
637       return received_signal_[signum];
638     } else {
639       return false;
640     }
641   }
642 
643   // Clears the given signal number.
ClearSignal(int signum)644   void ClearSignal(int signum) {
645     ASSERT(signum < ARRAY_SIZE(received_signal_));
646     if (signum < ARRAY_SIZE(received_signal_)) {
647       received_signal_[signum] = false;
648     }
649   }
650 
651   // Returns the file descriptor to monitor for signal events.
GetDescriptor() const652   int GetDescriptor() const {
653     return afd_[0];
654   }
655 
656   // This is called directly from our real signal handler, so it must be
657   // signal-handler-safe. That means it cannot assume anything about the
658   // user-level state of the process, since the handler could be executed at any
659   // time on any thread.
OnPosixSignalReceived(int signum)660   void OnPosixSignalReceived(int signum) {
661     if (signum >= ARRAY_SIZE(received_signal_)) {
662       // We don't have space in our array for this.
663       return;
664     }
665     // Set a flag saying we've seen this signal.
666     received_signal_[signum] = true;
667     // Notify application code that we got a signal.
668     const uint8 b[1] = { 0 };
669     if (-1 == write(afd_[1], b, sizeof(b))) {
670       // Nothing we can do here. If there's an error somehow then there's
671       // nothing we can safely do from a signal handler.
672       // No, we can't even safely log it.
673       // But, we still have to check the return value here. Otherwise,
674       // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help.
675       return;
676     }
677   }
678 
679  private:
PosixSignalHandler()680   PosixSignalHandler() {
681     if (pipe(afd_) < 0) {
682       LOG_ERR(LS_ERROR) << "pipe failed";
683       return;
684     }
685     if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) {
686       LOG_ERR(LS_WARNING) << "fcntl #1 failed";
687     }
688     if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) {
689       LOG_ERR(LS_WARNING) << "fcntl #2 failed";
690     }
691     memset(const_cast<void *>(static_cast<volatile void *>(received_signal_)),
692            0,
693            sizeof(received_signal_));
694   }
695 
~PosixSignalHandler()696   ~PosixSignalHandler() {
697     int fd1 = afd_[0];
698     int fd2 = afd_[1];
699     // We clobber the stored file descriptor numbers here or else in principle
700     // a signal that happens to be delivered during application termination
701     // could erroneously write a zero byte to an unrelated file handle in
702     // OnPosixSignalReceived() if some other file happens to be opened later
703     // during shutdown and happens to be given the same file descriptor number
704     // as our pipe had. Unfortunately even with this precaution there is still a
705     // race where that could occur if said signal happens to be handled
706     // concurrently with this code and happens to have already read the value of
707     // afd_[1] from memory before we clobber it, but that's unlikely.
708     afd_[0] = -1;
709     afd_[1] = -1;
710     close(fd1);
711     close(fd2);
712   }
713 
714   int afd_[2];
715   // These are boolean flags that will be set in our signal handler and read
716   // and cleared from Wait(). There is a race involved in this, but it is
717   // benign. The signal handler sets the flag before signaling the pipe, so
718   // we'll never end up blocking in select() while a flag is still true.
719   // However, if two of the same signal arrive close to each other then it's
720   // possible that the second time the handler may set the flag while it's still
721   // true, meaning that signal will be missed. But the first occurrence of it
722   // will still be handled, so this isn't a problem.
723   // Volatile is not necessary here for correctness, but this data _is_ volatile
724   // so I've marked it as such.
725   volatile uint8 received_signal_[kNumPosixSignals];
726 };
727 
728 class PosixSignalDispatcher : public Dispatcher {
729  public:
PosixSignalDispatcher(PhysicalSocketServer * owner)730   PosixSignalDispatcher(PhysicalSocketServer *owner) : owner_(owner) {
731     owner_->Add(this);
732   }
733 
~PosixSignalDispatcher()734   virtual ~PosixSignalDispatcher() {
735     owner_->Remove(this);
736   }
737 
GetRequestedEvents()738   virtual uint32 GetRequestedEvents() {
739     return DE_READ;
740   }
741 
OnPreEvent(uint32 ff)742   virtual void OnPreEvent(uint32 ff) {
743     // Events might get grouped if signals come very fast, so we read out up to
744     // 16 bytes to make sure we keep the pipe empty.
745     uint8 b[16];
746     ssize_t ret = read(GetDescriptor(), b, sizeof(b));
747     if (ret < 0) {
748       LOG_ERR(LS_WARNING) << "Error in read()";
749     } else if (ret == 0) {
750       LOG(LS_WARNING) << "Should have read at least one byte";
751     }
752   }
753 
OnEvent(uint32 ff,int err)754   virtual void OnEvent(uint32 ff, int err) {
755     for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals;
756          ++signum) {
757       if (PosixSignalHandler::Instance()->IsSignalSet(signum)) {
758         PosixSignalHandler::Instance()->ClearSignal(signum);
759         HandlerMap::iterator i = handlers_.find(signum);
760         if (i == handlers_.end()) {
761           // This can happen if a signal is delivered to our process at around
762           // the same time as we unset our handler for it. It is not an error
763           // condition, but it's unusual enough to be worth logging.
764           LOG(LS_INFO) << "Received signal with no handler: " << signum;
765         } else {
766           // Otherwise, execute our handler.
767           (*i->second)(signum);
768         }
769       }
770     }
771   }
772 
GetDescriptor()773   virtual int GetDescriptor() {
774     return PosixSignalHandler::Instance()->GetDescriptor();
775   }
776 
IsDescriptorClosed()777   virtual bool IsDescriptorClosed() {
778     return false;
779   }
780 
SetHandler(int signum,void (* handler)(int))781   void SetHandler(int signum, void (*handler)(int)) {
782     handlers_[signum] = handler;
783   }
784 
ClearHandler(int signum)785   void ClearHandler(int signum) {
786     handlers_.erase(signum);
787   }
788 
HasHandlers()789   bool HasHandlers() {
790     return !handlers_.empty();
791   }
792 
793  private:
794   typedef std::map<int, void (*)(int)> HandlerMap;
795 
796   HandlerMap handlers_;
797   // Our owner.
798   PhysicalSocketServer *owner_;
799 };
800 
801 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
802  public:
SocketDispatcher(PhysicalSocketServer * ss)803   explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) {
804   }
SocketDispatcher(SOCKET s,PhysicalSocketServer * ss)805   SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) {
806   }
807 
~SocketDispatcher()808   virtual ~SocketDispatcher() {
809     Close();
810   }
811 
Initialize()812   bool Initialize() {
813     ss_->Add(this);
814     fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
815     return true;
816   }
817 
Create(int type)818   virtual bool Create(int type) {
819     return Create(AF_INET, type);
820   }
821 
Create(int family,int type)822   virtual bool Create(int family, int type) {
823     // Change the socket to be non-blocking.
824     if (!PhysicalSocket::Create(family, type))
825       return false;
826 
827     return Initialize();
828   }
829 
GetDescriptor()830   virtual int GetDescriptor() {
831     return s_;
832   }
833 
IsDescriptorClosed()834   virtual bool IsDescriptorClosed() {
835     // We don't have a reliable way of distinguishing end-of-stream
836     // from readability.  So test on each readable call.  Is this
837     // inefficient?  Probably.
838     char ch;
839     ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
840     if (res > 0) {
841       // Data available, so not closed.
842       return false;
843     } else if (res == 0) {
844       // EOF, so closed.
845       return true;
846     } else {  // error
847       switch (errno) {
848         // Returned if we've already closed s_.
849         case EBADF:
850         // Returned during ungraceful peer shutdown.
851         case ECONNRESET:
852           return true;
853         default:
854           // Assume that all other errors are just blocking errors, meaning the
855           // connection is still good but we just can't read from it right now.
856           // This should only happen when connecting (and at most once), because
857           // in all other cases this function is only called if the file
858           // descriptor is already known to be in the readable state. However,
859           // it's not necessary a problem if we spuriously interpret a
860           // "connection lost"-type error as a blocking error, because typically
861           // the next recv() will get EOF, so we'll still eventually notice that
862           // the socket is closed.
863           LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
864           return false;
865       }
866     }
867   }
868 
GetRequestedEvents()869   virtual uint32 GetRequestedEvents() {
870     return enabled_events_;
871   }
872 
OnPreEvent(uint32 ff)873   virtual void OnPreEvent(uint32 ff) {
874     if ((ff & DE_CONNECT) != 0)
875       state_ = CS_CONNECTED;
876     if ((ff & DE_CLOSE) != 0)
877       state_ = CS_CLOSED;
878   }
879 
OnEvent(uint32 ff,int err)880   virtual void OnEvent(uint32 ff, int err) {
881     // Make sure we deliver connect/accept first. Otherwise, consumers may see
882     // something like a READ followed by a CONNECT, which would be odd.
883     if ((ff & DE_CONNECT) != 0) {
884       enabled_events_ &= ~DE_CONNECT;
885       SignalConnectEvent(this);
886     }
887     if ((ff & DE_ACCEPT) != 0) {
888       enabled_events_ &= ~DE_ACCEPT;
889       SignalReadEvent(this);
890     }
891     if ((ff & DE_READ) != 0) {
892       enabled_events_ &= ~DE_READ;
893       SignalReadEvent(this);
894     }
895     if ((ff & DE_WRITE) != 0) {
896       enabled_events_ &= ~DE_WRITE;
897       SignalWriteEvent(this);
898     }
899     if ((ff & DE_CLOSE) != 0) {
900       // The socket is now dead to us, so stop checking it.
901       enabled_events_ = 0;
902       SignalCloseEvent(this, err);
903     }
904   }
905 
Close()906   virtual int Close() {
907     if (s_ == INVALID_SOCKET)
908       return 0;
909 
910     ss_->Remove(this);
911     return PhysicalSocket::Close();
912   }
913 };
914 
915 class FileDispatcher: public Dispatcher, public AsyncFile {
916  public:
FileDispatcher(int fd,PhysicalSocketServer * ss)917   FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) {
918     set_readable(true);
919 
920     ss_->Add(this);
921 
922     fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK);
923   }
924 
~FileDispatcher()925   virtual ~FileDispatcher() {
926     ss_->Remove(this);
927   }
928 
socketserver()929   SocketServer* socketserver() { return ss_; }
930 
GetDescriptor()931   virtual int GetDescriptor() {
932     return fd_;
933   }
934 
IsDescriptorClosed()935   virtual bool IsDescriptorClosed() {
936     return false;
937   }
938 
GetRequestedEvents()939   virtual uint32 GetRequestedEvents() {
940     return flags_;
941   }
942 
OnPreEvent(uint32 ff)943   virtual void OnPreEvent(uint32 ff) {
944   }
945 
OnEvent(uint32 ff,int err)946   virtual void OnEvent(uint32 ff, int err) {
947     if ((ff & DE_READ) != 0)
948       SignalReadEvent(this);
949     if ((ff & DE_WRITE) != 0)
950       SignalWriteEvent(this);
951     if ((ff & DE_CLOSE) != 0)
952       SignalCloseEvent(this, err);
953   }
954 
readable()955   virtual bool readable() {
956     return (flags_ & DE_READ) != 0;
957   }
958 
set_readable(bool value)959   virtual void set_readable(bool value) {
960     flags_ = value ? (flags_ | DE_READ) : (flags_ & ~DE_READ);
961   }
962 
writable()963   virtual bool writable() {
964     return (flags_ & DE_WRITE) != 0;
965   }
966 
set_writable(bool value)967   virtual void set_writable(bool value) {
968     flags_ = value ? (flags_ | DE_WRITE) : (flags_ & ~DE_WRITE);
969   }
970 
971  private:
972   PhysicalSocketServer* ss_;
973   int fd_;
974   int flags_;
975 };
976 
CreateFile(int fd)977 AsyncFile* PhysicalSocketServer::CreateFile(int fd) {
978   return new FileDispatcher(fd, this);
979 }
980 
981 #endif // WEBRTC_POSIX
982 
983 #if defined(WEBRTC_WIN)
FlagsToEvents(uint32 events)984 static uint32 FlagsToEvents(uint32 events) {
985   uint32 ffFD = FD_CLOSE;
986   if (events & DE_READ)
987     ffFD |= FD_READ;
988   if (events & DE_WRITE)
989     ffFD |= FD_WRITE;
990   if (events & DE_CONNECT)
991     ffFD |= FD_CONNECT;
992   if (events & DE_ACCEPT)
993     ffFD |= FD_ACCEPT;
994   return ffFD;
995 }
996 
997 class EventDispatcher : public Dispatcher {
998  public:
EventDispatcher(PhysicalSocketServer * ss)999   EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) {
1000     hev_ = WSACreateEvent();
1001     if (hev_) {
1002       ss_->Add(this);
1003     }
1004   }
1005 
~EventDispatcher()1006   ~EventDispatcher() {
1007     if (hev_ != NULL) {
1008       ss_->Remove(this);
1009       WSACloseEvent(hev_);
1010       hev_ = NULL;
1011     }
1012   }
1013 
Signal()1014   virtual void Signal() {
1015     if (hev_ != NULL)
1016       WSASetEvent(hev_);
1017   }
1018 
GetRequestedEvents()1019   virtual uint32 GetRequestedEvents() {
1020     return 0;
1021   }
1022 
OnPreEvent(uint32 ff)1023   virtual void OnPreEvent(uint32 ff) {
1024     WSAResetEvent(hev_);
1025   }
1026 
OnEvent(uint32 ff,int err)1027   virtual void OnEvent(uint32 ff, int err) {
1028   }
1029 
GetWSAEvent()1030   virtual WSAEVENT GetWSAEvent() {
1031     return hev_;
1032   }
1033 
GetSocket()1034   virtual SOCKET GetSocket() {
1035     return INVALID_SOCKET;
1036   }
1037 
CheckSignalClose()1038   virtual bool CheckSignalClose() { return false; }
1039 
1040 private:
1041   PhysicalSocketServer* ss_;
1042   WSAEVENT hev_;
1043 };
1044 
1045 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
1046  public:
1047   static int next_id_;
1048   int id_;
1049   bool signal_close_;
1050   int signal_err_;
1051 
SocketDispatcher(PhysicalSocketServer * ss)1052   SocketDispatcher(PhysicalSocketServer* ss)
1053       : PhysicalSocket(ss),
1054         id_(0),
1055         signal_close_(false) {
1056   }
1057 
SocketDispatcher(SOCKET s,PhysicalSocketServer * ss)1058   SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
1059       : PhysicalSocket(ss, s),
1060         id_(0),
1061         signal_close_(false) {
1062   }
1063 
~SocketDispatcher()1064   virtual ~SocketDispatcher() {
1065     Close();
1066   }
1067 
Initialize()1068   bool Initialize() {
1069     ASSERT(s_ != INVALID_SOCKET);
1070     // Must be a non-blocking
1071     u_long argp = 1;
1072     ioctlsocket(s_, FIONBIO, &argp);
1073     ss_->Add(this);
1074     return true;
1075   }
1076 
Create(int type)1077   virtual bool Create(int type) {
1078     return Create(AF_INET, type);
1079   }
1080 
Create(int family,int type)1081   virtual bool Create(int family, int type) {
1082     // Create socket
1083     if (!PhysicalSocket::Create(family, type))
1084       return false;
1085 
1086     if (!Initialize())
1087       return false;
1088 
1089     do { id_ = ++next_id_; } while (id_ == 0);
1090     return true;
1091   }
1092 
Close()1093   virtual int Close() {
1094     if (s_ == INVALID_SOCKET)
1095       return 0;
1096 
1097     id_ = 0;
1098     signal_close_ = false;
1099     ss_->Remove(this);
1100     return PhysicalSocket::Close();
1101   }
1102 
GetRequestedEvents()1103   virtual uint32 GetRequestedEvents() {
1104     return enabled_events_;
1105   }
1106 
OnPreEvent(uint32 ff)1107   virtual void OnPreEvent(uint32 ff) {
1108     if ((ff & DE_CONNECT) != 0)
1109       state_ = CS_CONNECTED;
1110     // We set CS_CLOSED from CheckSignalClose.
1111   }
1112 
OnEvent(uint32 ff,int err)1113   virtual void OnEvent(uint32 ff, int err) {
1114     int cache_id = id_;
1115     // Make sure we deliver connect/accept first. Otherwise, consumers may see
1116     // something like a READ followed by a CONNECT, which would be odd.
1117     if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
1118       if (ff != DE_CONNECT)
1119         LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
1120       enabled_events_ &= ~DE_CONNECT;
1121 #ifdef _DEBUG
1122       dbg_addr_ = "Connected @ ";
1123       dbg_addr_.append(GetRemoteAddress().ToString());
1124 #endif  // _DEBUG
1125       SignalConnectEvent(this);
1126     }
1127     if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
1128       enabled_events_ &= ~DE_ACCEPT;
1129       SignalReadEvent(this);
1130     }
1131     if ((ff & DE_READ) != 0) {
1132       enabled_events_ &= ~DE_READ;
1133       SignalReadEvent(this);
1134     }
1135     if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
1136       enabled_events_ &= ~DE_WRITE;
1137       SignalWriteEvent(this);
1138     }
1139     if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
1140       signal_close_ = true;
1141       signal_err_ = err;
1142     }
1143   }
1144 
GetWSAEvent()1145   virtual WSAEVENT GetWSAEvent() {
1146     return WSA_INVALID_EVENT;
1147   }
1148 
GetSocket()1149   virtual SOCKET GetSocket() {
1150     return s_;
1151   }
1152 
CheckSignalClose()1153   virtual bool CheckSignalClose() {
1154     if (!signal_close_)
1155       return false;
1156 
1157     char ch;
1158     if (recv(s_, &ch, 1, MSG_PEEK) > 0)
1159       return false;
1160 
1161     state_ = CS_CLOSED;
1162     signal_close_ = false;
1163     SignalCloseEvent(this, signal_err_);
1164     return true;
1165   }
1166 };
1167 
1168 int SocketDispatcher::next_id_ = 0;
1169 
1170 #endif  // WEBRTC_WIN
1171 
1172 // Sets the value of a boolean value to false when signaled.
1173 class Signaler : public EventDispatcher {
1174  public:
Signaler(PhysicalSocketServer * ss,bool * pf)1175   Signaler(PhysicalSocketServer* ss, bool* pf)
1176       : EventDispatcher(ss), pf_(pf) {
1177   }
~Signaler()1178   virtual ~Signaler() { }
1179 
OnEvent(uint32 ff,int err)1180   void OnEvent(uint32 ff, int err) {
1181     if (pf_)
1182       *pf_ = false;
1183   }
1184 
1185  private:
1186   bool *pf_;
1187 };
1188 
PhysicalSocketServer()1189 PhysicalSocketServer::PhysicalSocketServer()
1190     : fWait_(false) {
1191   signal_wakeup_ = new Signaler(this, &fWait_);
1192 #if defined(WEBRTC_WIN)
1193   socket_ev_ = WSACreateEvent();
1194 #endif
1195 }
1196 
~PhysicalSocketServer()1197 PhysicalSocketServer::~PhysicalSocketServer() {
1198 #if defined(WEBRTC_WIN)
1199   WSACloseEvent(socket_ev_);
1200 #endif
1201 #if defined(WEBRTC_POSIX)
1202   signal_dispatcher_.reset();
1203 #endif
1204   delete signal_wakeup_;
1205   ASSERT(dispatchers_.empty());
1206 }
1207 
WakeUp()1208 void PhysicalSocketServer::WakeUp() {
1209   signal_wakeup_->Signal();
1210 }
1211 
CreateSocket(int type)1212 Socket* PhysicalSocketServer::CreateSocket(int type) {
1213   return CreateSocket(AF_INET, type);
1214 }
1215 
CreateSocket(int family,int type)1216 Socket* PhysicalSocketServer::CreateSocket(int family, int type) {
1217   PhysicalSocket* socket = new PhysicalSocket(this);
1218   if (socket->Create(family, type)) {
1219     return socket;
1220   } else {
1221     delete socket;
1222     return 0;
1223   }
1224 }
1225 
CreateAsyncSocket(int type)1226 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) {
1227   return CreateAsyncSocket(AF_INET, type);
1228 }
1229 
CreateAsyncSocket(int family,int type)1230 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {
1231   SocketDispatcher* dispatcher = new SocketDispatcher(this);
1232   if (dispatcher->Create(family, type)) {
1233     return dispatcher;
1234   } else {
1235     delete dispatcher;
1236     return 0;
1237   }
1238 }
1239 
WrapSocket(SOCKET s)1240 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
1241   SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
1242   if (dispatcher->Initialize()) {
1243     return dispatcher;
1244   } else {
1245     delete dispatcher;
1246     return 0;
1247   }
1248 }
1249 
Add(Dispatcher * pdispatcher)1250 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1251   CritScope cs(&crit_);
1252   // Prevent duplicates. This can cause dead dispatchers to stick around.
1253   DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1254                                            dispatchers_.end(),
1255                                            pdispatcher);
1256   if (pos != dispatchers_.end())
1257     return;
1258   dispatchers_.push_back(pdispatcher);
1259 }
1260 
Remove(Dispatcher * pdispatcher)1261 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
1262   CritScope cs(&crit_);
1263   DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1264                                            dispatchers_.end(),
1265                                            pdispatcher);
1266   // We silently ignore duplicate calls to Add, so we should silently ignore
1267   // the (expected) symmetric calls to Remove. Note that this may still hide
1268   // a real issue, so we at least log a warning about it.
1269   if (pos == dispatchers_.end()) {
1270     LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1271                     << "dispatcher, potentially from a duplicate call to Add.";
1272     return;
1273   }
1274   size_t index = pos - dispatchers_.begin();
1275   dispatchers_.erase(pos);
1276   for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
1277        ++it) {
1278     if (index < **it) {
1279       --**it;
1280     }
1281   }
1282 }
1283 
1284 #if defined(WEBRTC_POSIX)
Wait(int cmsWait,bool process_io)1285 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1286   // Calculate timing information
1287 
1288   struct timeval *ptvWait = NULL;
1289   struct timeval tvWait;
1290   struct timeval tvStop;
1291   if (cmsWait != kForever) {
1292     // Calculate wait timeval
1293     tvWait.tv_sec = cmsWait / 1000;
1294     tvWait.tv_usec = (cmsWait % 1000) * 1000;
1295     ptvWait = &tvWait;
1296 
1297     // Calculate when to return in a timeval
1298     gettimeofday(&tvStop, NULL);
1299     tvStop.tv_sec += tvWait.tv_sec;
1300     tvStop.tv_usec += tvWait.tv_usec;
1301     if (tvStop.tv_usec >= 1000000) {
1302       tvStop.tv_usec -= 1000000;
1303       tvStop.tv_sec += 1;
1304     }
1305   }
1306 
1307   // Zero all fd_sets. Don't need to do this inside the loop since
1308   // select() zeros the descriptors not signaled
1309 
1310   fd_set fdsRead;
1311   FD_ZERO(&fdsRead);
1312   fd_set fdsWrite;
1313   FD_ZERO(&fdsWrite);
1314 
1315   fWait_ = true;
1316 
1317   while (fWait_) {
1318     int fdmax = -1;
1319     {
1320       CritScope cr(&crit_);
1321       for (size_t i = 0; i < dispatchers_.size(); ++i) {
1322         // Query dispatchers for read and write wait state
1323         Dispatcher *pdispatcher = dispatchers_[i];
1324         ASSERT(pdispatcher);
1325         if (!process_io && (pdispatcher != signal_wakeup_))
1326           continue;
1327         int fd = pdispatcher->GetDescriptor();
1328         if (fd > fdmax)
1329           fdmax = fd;
1330 
1331         uint32 ff = pdispatcher->GetRequestedEvents();
1332         if (ff & (DE_READ | DE_ACCEPT))
1333           FD_SET(fd, &fdsRead);
1334         if (ff & (DE_WRITE | DE_CONNECT))
1335           FD_SET(fd, &fdsWrite);
1336       }
1337     }
1338 
1339     // Wait then call handlers as appropriate
1340     // < 0 means error
1341     // 0 means timeout
1342     // > 0 means count of descriptors ready
1343     int n = select(fdmax + 1, &fdsRead, &fdsWrite, NULL, ptvWait);
1344 
1345     // If error, return error.
1346     if (n < 0) {
1347       if (errno != EINTR) {
1348         LOG_E(LS_ERROR, EN, errno) << "select";
1349         return false;
1350       }
1351       // Else ignore the error and keep going. If this EINTR was for one of the
1352       // signals managed by this PhysicalSocketServer, the
1353       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1354       // iteration.
1355     } else if (n == 0) {
1356       // If timeout, return success
1357       return true;
1358     } else {
1359       // We have signaled descriptors
1360       CritScope cr(&crit_);
1361       for (size_t i = 0; i < dispatchers_.size(); ++i) {
1362         Dispatcher *pdispatcher = dispatchers_[i];
1363         int fd = pdispatcher->GetDescriptor();
1364         uint32 ff = 0;
1365         int errcode = 0;
1366 
1367         // Reap any error code, which can be signaled through reads or writes.
1368         // TODO: Should we set errcode if getsockopt fails?
1369         if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
1370           socklen_t len = sizeof(errcode);
1371           ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1372         }
1373 
1374         // Check readable descriptors. If we're waiting on an accept, signal
1375         // that. Otherwise we're waiting for data, check to see if we're
1376         // readable or really closed.
1377         // TODO: Only peek at TCP descriptors.
1378         if (FD_ISSET(fd, &fdsRead)) {
1379           FD_CLR(fd, &fdsRead);
1380           if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
1381             ff |= DE_ACCEPT;
1382           } else if (errcode || pdispatcher->IsDescriptorClosed()) {
1383             ff |= DE_CLOSE;
1384           } else {
1385             ff |= DE_READ;
1386           }
1387         }
1388 
1389         // Check writable descriptors. If we're waiting on a connect, detect
1390         // success versus failure by the reaped error code.
1391         if (FD_ISSET(fd, &fdsWrite)) {
1392           FD_CLR(fd, &fdsWrite);
1393           if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
1394             if (!errcode) {
1395               ff |= DE_CONNECT;
1396             } else {
1397               ff |= DE_CLOSE;
1398             }
1399           } else {
1400             ff |= DE_WRITE;
1401           }
1402         }
1403 
1404         // Tell the descriptor about the event.
1405         if (ff != 0) {
1406           pdispatcher->OnPreEvent(ff);
1407           pdispatcher->OnEvent(ff, errcode);
1408         }
1409       }
1410     }
1411 
1412     // Recalc the time remaining to wait. Doing it here means it doesn't get
1413     // calced twice the first time through the loop
1414     if (ptvWait) {
1415       ptvWait->tv_sec = 0;
1416       ptvWait->tv_usec = 0;
1417       struct timeval tvT;
1418       gettimeofday(&tvT, NULL);
1419       if ((tvStop.tv_sec > tvT.tv_sec)
1420           || ((tvStop.tv_sec == tvT.tv_sec)
1421               && (tvStop.tv_usec > tvT.tv_usec))) {
1422         ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;
1423         ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;
1424         if (ptvWait->tv_usec < 0) {
1425           ASSERT(ptvWait->tv_sec > 0);
1426           ptvWait->tv_usec += 1000000;
1427           ptvWait->tv_sec -= 1;
1428         }
1429       }
1430     }
1431   }
1432 
1433   return true;
1434 }
1435 
GlobalSignalHandler(int signum)1436 static void GlobalSignalHandler(int signum) {
1437   PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
1438 }
1439 
SetPosixSignalHandler(int signum,void (* handler)(int))1440 bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
1441                                                  void (*handler)(int)) {
1442   // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
1443   // otherwise set one.
1444   if (handler == SIG_IGN || handler == SIG_DFL) {
1445     if (!InstallSignal(signum, handler)) {
1446       return false;
1447     }
1448     if (signal_dispatcher_) {
1449       signal_dispatcher_->ClearHandler(signum);
1450       if (!signal_dispatcher_->HasHandlers()) {
1451         signal_dispatcher_.reset();
1452       }
1453     }
1454   } else {
1455     if (!signal_dispatcher_) {
1456       signal_dispatcher_.reset(new PosixSignalDispatcher(this));
1457     }
1458     signal_dispatcher_->SetHandler(signum, handler);
1459     if (!InstallSignal(signum, &GlobalSignalHandler)) {
1460       return false;
1461     }
1462   }
1463   return true;
1464 }
1465 
signal_dispatcher()1466 Dispatcher* PhysicalSocketServer::signal_dispatcher() {
1467   return signal_dispatcher_.get();
1468 }
1469 
InstallSignal(int signum,void (* handler)(int))1470 bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) {
1471   struct sigaction act;
1472   // It doesn't really matter what we set this mask to.
1473   if (sigemptyset(&act.sa_mask) != 0) {
1474     LOG_ERR(LS_ERROR) << "Couldn't set mask";
1475     return false;
1476   }
1477   act.sa_handler = handler;
1478 #if !defined(__native_client__)
1479   // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it
1480   // and it's a nuisance. Though some syscalls still return EINTR and there's no
1481   // real standard for which ones. :(
1482   act.sa_flags = SA_RESTART;
1483 #else
1484   act.sa_flags = 0;
1485 #endif
1486   if (sigaction(signum, &act, NULL) != 0) {
1487     LOG_ERR(LS_ERROR) << "Couldn't set sigaction";
1488     return false;
1489   }
1490   return true;
1491 }
1492 #endif  // WEBRTC_POSIX
1493 
1494 #if defined(WEBRTC_WIN)
Wait(int cmsWait,bool process_io)1495 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1496   int cmsTotal = cmsWait;
1497   int cmsElapsed = 0;
1498   uint32 msStart = Time();
1499 
1500   fWait_ = true;
1501   while (fWait_) {
1502     std::vector<WSAEVENT> events;
1503     std::vector<Dispatcher *> event_owners;
1504 
1505     events.push_back(socket_ev_);
1506 
1507     {
1508       CritScope cr(&crit_);
1509       size_t i = 0;
1510       iterators_.push_back(&i);
1511       // Don't track dispatchers_.size(), because we want to pick up any new
1512       // dispatchers that were added while processing the loop.
1513       while (i < dispatchers_.size()) {
1514         Dispatcher* disp = dispatchers_[i++];
1515         if (!process_io && (disp != signal_wakeup_))
1516           continue;
1517         SOCKET s = disp->GetSocket();
1518         if (disp->CheckSignalClose()) {
1519           // We just signalled close, don't poll this socket
1520         } else if (s != INVALID_SOCKET) {
1521           WSAEventSelect(s,
1522                          events[0],
1523                          FlagsToEvents(disp->GetRequestedEvents()));
1524         } else {
1525           events.push_back(disp->GetWSAEvent());
1526           event_owners.push_back(disp);
1527         }
1528       }
1529       ASSERT(iterators_.back() == &i);
1530       iterators_.pop_back();
1531     }
1532 
1533     // Which is shorter, the delay wait or the asked wait?
1534 
1535     int cmsNext;
1536     if (cmsWait == kForever) {
1537       cmsNext = cmsWait;
1538     } else {
1539       cmsNext = _max(0, cmsTotal - cmsElapsed);
1540     }
1541 
1542     // Wait for one of the events to signal
1543     DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()),
1544                                         &events[0],
1545                                         false,
1546                                         cmsNext,
1547                                         false);
1548 
1549     if (dw == WSA_WAIT_FAILED) {
1550       // Failed?
1551       // TODO: need a better strategy than this!
1552       WSAGetLastError();
1553       ASSERT(false);
1554       return false;
1555     } else if (dw == WSA_WAIT_TIMEOUT) {
1556       // Timeout?
1557       return true;
1558     } else {
1559       // Figure out which one it is and call it
1560       CritScope cr(&crit_);
1561       int index = dw - WSA_WAIT_EVENT_0;
1562       if (index > 0) {
1563         --index; // The first event is the socket event
1564         event_owners[index]->OnPreEvent(0);
1565         event_owners[index]->OnEvent(0, 0);
1566       } else if (process_io) {
1567         size_t i = 0, end = dispatchers_.size();
1568         iterators_.push_back(&i);
1569         iterators_.push_back(&end);  // Don't iterate over new dispatchers.
1570         while (i < end) {
1571           Dispatcher* disp = dispatchers_[i++];
1572           SOCKET s = disp->GetSocket();
1573           if (s == INVALID_SOCKET)
1574             continue;
1575 
1576           WSANETWORKEVENTS wsaEvents;
1577           int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
1578           if (err == 0) {
1579 
1580 #if LOGGING
1581             {
1582               if ((wsaEvents.lNetworkEvents & FD_READ) &&
1583                   wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
1584                 LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error "
1585                              << wsaEvents.iErrorCode[FD_READ_BIT];
1586               }
1587               if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
1588                   wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
1589                 LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error "
1590                              << wsaEvents.iErrorCode[FD_WRITE_BIT];
1591               }
1592               if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
1593                   wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
1594                 LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error "
1595                              << wsaEvents.iErrorCode[FD_CONNECT_BIT];
1596               }
1597               if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
1598                   wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
1599                 LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error "
1600                              << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
1601               }
1602               if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
1603                   wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
1604                 LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error "
1605                              << wsaEvents.iErrorCode[FD_CLOSE_BIT];
1606               }
1607             }
1608 #endif
1609             uint32 ff = 0;
1610             int errcode = 0;
1611             if (wsaEvents.lNetworkEvents & FD_READ)
1612               ff |= DE_READ;
1613             if (wsaEvents.lNetworkEvents & FD_WRITE)
1614               ff |= DE_WRITE;
1615             if (wsaEvents.lNetworkEvents & FD_CONNECT) {
1616               if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
1617                 ff |= DE_CONNECT;
1618               } else {
1619                 ff |= DE_CLOSE;
1620                 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
1621               }
1622             }
1623             if (wsaEvents.lNetworkEvents & FD_ACCEPT)
1624               ff |= DE_ACCEPT;
1625             if (wsaEvents.lNetworkEvents & FD_CLOSE) {
1626               ff |= DE_CLOSE;
1627               errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
1628             }
1629             if (ff != 0) {
1630               disp->OnPreEvent(ff);
1631               disp->OnEvent(ff, errcode);
1632             }
1633           }
1634         }
1635         ASSERT(iterators_.back() == &end);
1636         iterators_.pop_back();
1637         ASSERT(iterators_.back() == &i);
1638         iterators_.pop_back();
1639       }
1640 
1641       // Reset the network event until new activity occurs
1642       WSAResetEvent(socket_ev_);
1643     }
1644 
1645     // Break?
1646     if (!fWait_)
1647       break;
1648     cmsElapsed = TimeSince(msStart);
1649     if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
1650        break;
1651     }
1652   }
1653 
1654   // Done
1655   return true;
1656 }
1657 #endif  // WEBRTC_WIN
1658 
1659 }  // namespace rtc
1660