• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "rtc_base/physical_socket_server.h"
11 
12 #if defined(_MSC_VER) && _MSC_VER < 1300
13 #pragma warning(disable : 4786)
14 #endif
15 
16 #ifdef MEMORY_SANITIZER
17 #include <sanitizer/msan_interface.h>
18 #endif
19 
20 #if defined(WEBRTC_POSIX)
21 #include <fcntl.h>
22 #include <string.h>
23 #if defined(WEBRTC_USE_EPOLL)
24 // "poll" will be used to wait for the signal dispatcher.
25 #include <poll.h>
26 #endif
27 #include <sys/ioctl.h>
28 #include <sys/select.h>
29 #include <sys/time.h>
30 #include <unistd.h>
31 #endif
32 
33 #if defined(WEBRTC_WIN)
34 #include <windows.h>
35 #include <winsock2.h>
36 #include <ws2tcpip.h>
37 #undef SetPort
38 #endif
39 
40 #include <errno.h>
41 
42 #include <algorithm>
43 #include <map>
44 
45 #include "rtc_base/arraysize.h"
46 #include "rtc_base/byte_order.h"
47 #include "rtc_base/checks.h"
48 #include "rtc_base/logging.h"
49 #include "rtc_base/network_monitor.h"
50 #include "rtc_base/null_socket_server.h"
51 #include "rtc_base/time_utils.h"
52 
53 #if defined(WEBRTC_LINUX)
54 #include <linux/sockios.h>
55 #endif
56 
57 #if defined(WEBRTC_WIN)
58 #define LAST_SYSTEM_ERROR (::GetLastError())
59 #elif defined(__native_client__) && __native_client__
60 #define LAST_SYSTEM_ERROR (0)
61 #elif defined(WEBRTC_POSIX)
62 #define LAST_SYSTEM_ERROR (errno)
63 #endif  // WEBRTC_WIN
64 
65 #if defined(WEBRTC_POSIX)
66 #include <netinet/tcp.h>  // for TCP_NODELAY
67 #define IP_MTU 14  // Until this is integrated from linux/in.h to netinet/in.h
68 typedef void* SockOptArg;
69 
70 #endif  // WEBRTC_POSIX
71 
72 #if defined(WEBRTC_POSIX) && !defined(WEBRTC_MAC) && !defined(__native_client__)
73 
GetSocketRecvTimestamp(int socket)74 int64_t GetSocketRecvTimestamp(int socket) {
75   struct timeval tv_ioctl;
76   int ret = ioctl(socket, SIOCGSTAMP, &tv_ioctl);
77   if (ret != 0)
78     return -1;
79   int64_t timestamp =
80       rtc::kNumMicrosecsPerSec * static_cast<int64_t>(tv_ioctl.tv_sec) +
81       static_cast<int64_t>(tv_ioctl.tv_usec);
82   return timestamp;
83 }
84 
85 #else
86 
GetSocketRecvTimestamp(int socket)87 int64_t GetSocketRecvTimestamp(int socket) {
88   return -1;
89 }
90 #endif
91 
92 #if defined(WEBRTC_WIN)
93 typedef char* SockOptArg;
94 #endif
95 
96 #if defined(WEBRTC_USE_EPOLL)
97 // POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17.
98 #if !defined(POLLRDHUP)
99 #define POLLRDHUP 0x2000
100 #endif
101 #if !defined(EPOLLRDHUP)
102 #define EPOLLRDHUP 0x2000
103 #endif
104 #endif
105 
106 namespace rtc {
107 
CreateDefault()108 std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
109 #if defined(__native_client__)
110   return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);
111 #else
112   return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer);
113 #endif
114 }
115 
PhysicalSocket(PhysicalSocketServer * ss,SOCKET s)116 PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s)
117     : ss_(ss),
118       s_(s),
119       error_(0),
120       state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
121       resolver_(nullptr) {
122   if (s_ != INVALID_SOCKET) {
123     SetEnabledEvents(DE_READ | DE_WRITE);
124 
125     int type = SOCK_STREAM;
126     socklen_t len = sizeof(type);
127     const int res =
128         getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len);
129     RTC_DCHECK_EQ(0, res);
130     udp_ = (SOCK_DGRAM == type);
131   }
132 }
133 
~PhysicalSocket()134 PhysicalSocket::~PhysicalSocket() {
135   Close();
136 }
137 
Create(int family,int type)138 bool PhysicalSocket::Create(int family, int type) {
139   Close();
140   s_ = ::socket(family, type, 0);
141   udp_ = (SOCK_DGRAM == type);
142   family_ = family;
143   UpdateLastError();
144   if (udp_) {
145     SetEnabledEvents(DE_READ | DE_WRITE);
146   }
147   return s_ != INVALID_SOCKET;
148 }
149 
GetLocalAddress() const150 SocketAddress PhysicalSocket::GetLocalAddress() const {
151   sockaddr_storage addr_storage = {};
152   socklen_t addrlen = sizeof(addr_storage);
153   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
154   int result = ::getsockname(s_, addr, &addrlen);
155   SocketAddress address;
156   if (result >= 0) {
157     SocketAddressFromSockAddrStorage(addr_storage, &address);
158   } else {
159     RTC_LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
160                         << s_;
161   }
162   return address;
163 }
164 
GetRemoteAddress() const165 SocketAddress PhysicalSocket::GetRemoteAddress() const {
166   sockaddr_storage addr_storage = {};
167   socklen_t addrlen = sizeof(addr_storage);
168   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
169   int result = ::getpeername(s_, addr, &addrlen);
170   SocketAddress address;
171   if (result >= 0) {
172     SocketAddressFromSockAddrStorage(addr_storage, &address);
173   } else {
174     RTC_LOG(LS_WARNING)
175         << "GetRemoteAddress: unable to get remote addr, socket=" << s_;
176   }
177   return address;
178 }
179 
Bind(const SocketAddress & bind_addr)180 int PhysicalSocket::Bind(const SocketAddress& bind_addr) {
181   SocketAddress copied_bind_addr = bind_addr;
182   // If a network binder is available, use it to bind a socket to an interface
183   // instead of bind(), since this is more reliable on an OS with a weak host
184   // model.
185   if (ss_->network_binder() && !bind_addr.IsAnyIP()) {
186     NetworkBindingResult result =
187         ss_->network_binder()->BindSocketToNetwork(s_, bind_addr.ipaddr());
188     if (result == NetworkBindingResult::SUCCESS) {
189       // Since the network binder handled binding the socket to the desired
190       // network interface, we don't need to (and shouldn't) include an IP in
191       // the bind() call; bind() just needs to assign a port.
192       copied_bind_addr.SetIP(GetAnyIP(copied_bind_addr.ipaddr().family()));
193     } else if (result == NetworkBindingResult::NOT_IMPLEMENTED) {
194       RTC_LOG(LS_INFO) << "Can't bind socket to network because "
195                           "network binding is not implemented for this OS.";
196     } else {
197       if (bind_addr.IsLoopbackIP()) {
198         // If we couldn't bind to a loopback IP (which should only happen in
199         // test scenarios), continue on. This may be expected behavior.
200         RTC_LOG(LS_VERBOSE) << "Binding socket to loopback address"
201                             << " failed; result: " << static_cast<int>(result);
202       } else {
203         RTC_LOG(LS_WARNING) << "Binding socket to network address"
204                             << " failed; result: " << static_cast<int>(result);
205         // If a network binding was attempted and failed, we should stop here
206         // and not try to use the socket. Otherwise, we may end up sending
207         // packets with an invalid source address.
208         // See: https://bugs.chromium.org/p/webrtc/issues/detail?id=7026
209         return -1;
210       }
211     }
212   }
213   sockaddr_storage addr_storage;
214   size_t len = copied_bind_addr.ToSockAddrStorage(&addr_storage);
215   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
216   int err = ::bind(s_, addr, static_cast<int>(len));
217   UpdateLastError();
218 #if !defined(NDEBUG)
219   if (0 == err) {
220     dbg_addr_ = "Bound @ ";
221     dbg_addr_.append(GetLocalAddress().ToString());
222   }
223 #endif
224   return err;
225 }
226 
Connect(const SocketAddress & addr)227 int PhysicalSocket::Connect(const SocketAddress& addr) {
228   // TODO(pthatcher): Implicit creation is required to reconnect...
229   // ...but should we make it more explicit?
230   if (state_ != CS_CLOSED) {
231     SetError(EALREADY);
232     return SOCKET_ERROR;
233   }
234   if (addr.IsUnresolvedIP()) {
235     RTC_LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
236     resolver_ = new AsyncResolver();
237     resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult);
238     resolver_->Start(addr);
239     state_ = CS_CONNECTING;
240     return 0;
241   }
242 
243   return DoConnect(addr);
244 }
245 
DoConnect(const SocketAddress & connect_addr)246 int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) {
247   if ((s_ == INVALID_SOCKET) && !Create(connect_addr.family(), SOCK_STREAM)) {
248     return SOCKET_ERROR;
249   }
250   sockaddr_storage addr_storage;
251   size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
252   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
253   int err = ::connect(s_, addr, static_cast<int>(len));
254   UpdateLastError();
255   uint8_t events = DE_READ | DE_WRITE;
256   if (err == 0) {
257     state_ = CS_CONNECTED;
258   } else if (IsBlockingError(GetError())) {
259     state_ = CS_CONNECTING;
260     events |= DE_CONNECT;
261   } else {
262     return SOCKET_ERROR;
263   }
264 
265   EnableEvents(events);
266   return 0;
267 }
268 
GetError() const269 int PhysicalSocket::GetError() const {
270   CritScope cs(&crit_);
271   return error_;
272 }
273 
SetError(int error)274 void PhysicalSocket::SetError(int error) {
275   CritScope cs(&crit_);
276   error_ = error;
277 }
278 
GetState() const279 AsyncSocket::ConnState PhysicalSocket::GetState() const {
280   return state_;
281 }
282 
GetOption(Option opt,int * value)283 int PhysicalSocket::GetOption(Option opt, int* value) {
284   int slevel;
285   int sopt;
286   if (TranslateOption(opt, &slevel, &sopt) == -1)
287     return -1;
288   socklen_t optlen = sizeof(*value);
289   int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
290   if (ret == -1) {
291     return -1;
292   }
293   if (opt == OPT_DONTFRAGMENT) {
294 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
295     *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
296 #endif
297   } else if (opt == OPT_DSCP) {
298 #if defined(WEBRTC_POSIX)
299     // unshift DSCP value to get six most significant bits of IP DiffServ field
300     *value >>= 2;
301 #endif
302   }
303   return ret;
304 }
305 
SetOption(Option opt,int value)306 int PhysicalSocket::SetOption(Option opt, int value) {
307   int slevel;
308   int sopt;
309   if (TranslateOption(opt, &slevel, &sopt) == -1)
310     return -1;
311   if (opt == OPT_DONTFRAGMENT) {
312 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
313     value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
314 #endif
315   } else if (opt == OPT_DSCP) {
316 #if defined(WEBRTC_POSIX)
317     // shift DSCP value to fit six most significant bits of IP DiffServ field
318     value <<= 2;
319 #endif
320   }
321 #if defined(WEBRTC_POSIX)
322   if (sopt == IPV6_TCLASS) {
323     // Set the IPv4 option in all cases to support dual-stack sockets.
324     ::setsockopt(s_, IPPROTO_IP, IP_TOS, (SockOptArg)&value, sizeof(value));
325   }
326 #endif
327   return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
328 }
329 
Send(const void * pv,size_t cb)330 int PhysicalSocket::Send(const void* pv, size_t cb) {
331   int sent = DoSend(
332       s_, reinterpret_cast<const char*>(pv), static_cast<int>(cb),
333 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
334       // Suppress SIGPIPE. Without this, attempting to send on a socket whose
335       // other end is closed will result in a SIGPIPE signal being raised to
336       // our process, which by default will terminate the process, which we
337       // don't want. By specifying this flag, we'll just get the error EPIPE
338       // instead and can handle the error gracefully.
339       MSG_NOSIGNAL
340 #else
341       0
342 #endif
343   );
344   UpdateLastError();
345   MaybeRemapSendError();
346   // We have seen minidumps where this may be false.
347   RTC_DCHECK(sent <= static_cast<int>(cb));
348   if ((sent > 0 && sent < static_cast<int>(cb)) ||
349       (sent < 0 && IsBlockingError(GetError()))) {
350     EnableEvents(DE_WRITE);
351   }
352   return sent;
353 }
354 
SendTo(const void * buffer,size_t length,const SocketAddress & addr)355 int PhysicalSocket::SendTo(const void* buffer,
356                            size_t length,
357                            const SocketAddress& addr) {
358   sockaddr_storage saddr;
359   size_t len = addr.ToSockAddrStorage(&saddr);
360   int sent =
361       DoSendTo(s_, static_cast<const char*>(buffer), static_cast<int>(length),
362 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
363                // Suppress SIGPIPE. See above for explanation.
364                MSG_NOSIGNAL,
365 #else
366                0,
367 #endif
368                reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
369   UpdateLastError();
370   MaybeRemapSendError();
371   // We have seen minidumps where this may be false.
372   RTC_DCHECK(sent <= static_cast<int>(length));
373   if ((sent > 0 && sent < static_cast<int>(length)) ||
374       (sent < 0 && IsBlockingError(GetError()))) {
375     EnableEvents(DE_WRITE);
376   }
377   return sent;
378 }
379 
Recv(void * buffer,size_t length,int64_t * timestamp)380 int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) {
381   int received =
382       ::recv(s_, static_cast<char*>(buffer), static_cast<int>(length), 0);
383   if ((received == 0) && (length != 0)) {
384     // Note: on graceful shutdown, recv can return 0.  In this case, we
385     // pretend it is blocking, and then signal close, so that simplifying
386     // assumptions can be made about Recv.
387     RTC_LOG(LS_WARNING) << "EOF from socket; deferring close event";
388     // Must turn this back on so that the select() loop will notice the close
389     // event.
390     EnableEvents(DE_READ);
391     SetError(EWOULDBLOCK);
392     return SOCKET_ERROR;
393   }
394   if (timestamp) {
395     *timestamp = GetSocketRecvTimestamp(s_);
396   }
397   UpdateLastError();
398   int error = GetError();
399   bool success = (received >= 0) || IsBlockingError(error);
400   if (udp_ || success) {
401     EnableEvents(DE_READ);
402   }
403   if (!success) {
404     RTC_LOG_F(LS_VERBOSE) << "Error = " << error;
405   }
406   return received;
407 }
408 
RecvFrom(void * buffer,size_t length,SocketAddress * out_addr,int64_t * timestamp)409 int PhysicalSocket::RecvFrom(void* buffer,
410                              size_t length,
411                              SocketAddress* out_addr,
412                              int64_t* timestamp) {
413   sockaddr_storage addr_storage;
414   socklen_t addr_len = sizeof(addr_storage);
415   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
416   int received = ::recvfrom(s_, static_cast<char*>(buffer),
417                             static_cast<int>(length), 0, addr, &addr_len);
418   if (timestamp) {
419     *timestamp = GetSocketRecvTimestamp(s_);
420   }
421   UpdateLastError();
422   if ((received >= 0) && (out_addr != nullptr))
423     SocketAddressFromSockAddrStorage(addr_storage, out_addr);
424   int error = GetError();
425   bool success = (received >= 0) || IsBlockingError(error);
426   if (udp_ || success) {
427     EnableEvents(DE_READ);
428   }
429   if (!success) {
430     RTC_LOG_F(LS_VERBOSE) << "Error = " << error;
431   }
432   return received;
433 }
434 
Listen(int backlog)435 int PhysicalSocket::Listen(int backlog) {
436   int err = ::listen(s_, backlog);
437   UpdateLastError();
438   if (err == 0) {
439     state_ = CS_CONNECTING;
440     EnableEvents(DE_ACCEPT);
441 #if !defined(NDEBUG)
442     dbg_addr_ = "Listening @ ";
443     dbg_addr_.append(GetLocalAddress().ToString());
444 #endif
445   }
446   return err;
447 }
448 
Accept(SocketAddress * out_addr)449 AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) {
450   // Always re-subscribe DE_ACCEPT to make sure new incoming connections will
451   // trigger an event even if DoAccept returns an error here.
452   EnableEvents(DE_ACCEPT);
453   sockaddr_storage addr_storage;
454   socklen_t addr_len = sizeof(addr_storage);
455   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
456   SOCKET s = DoAccept(s_, addr, &addr_len);
457   UpdateLastError();
458   if (s == INVALID_SOCKET)
459     return nullptr;
460   if (out_addr != nullptr)
461     SocketAddressFromSockAddrStorage(addr_storage, out_addr);
462   return ss_->WrapSocket(s);
463 }
464 
Close()465 int PhysicalSocket::Close() {
466   if (s_ == INVALID_SOCKET)
467     return 0;
468   int err = ::closesocket(s_);
469   UpdateLastError();
470   s_ = INVALID_SOCKET;
471   state_ = CS_CLOSED;
472   SetEnabledEvents(0);
473   if (resolver_) {
474     resolver_->Destroy(false);
475     resolver_ = nullptr;
476   }
477   return err;
478 }
479 
DoAccept(SOCKET socket,sockaddr * addr,socklen_t * addrlen)480 SOCKET PhysicalSocket::DoAccept(SOCKET socket,
481                                 sockaddr* addr,
482                                 socklen_t* addrlen) {
483   return ::accept(socket, addr, addrlen);
484 }
485 
DoSend(SOCKET socket,const char * buf,int len,int flags)486 int PhysicalSocket::DoSend(SOCKET socket, const char* buf, int len, int flags) {
487   return ::send(socket, buf, len, flags);
488 }
489 
DoSendTo(SOCKET socket,const char * buf,int len,int flags,const struct sockaddr * dest_addr,socklen_t addrlen)490 int PhysicalSocket::DoSendTo(SOCKET socket,
491                              const char* buf,
492                              int len,
493                              int flags,
494                              const struct sockaddr* dest_addr,
495                              socklen_t addrlen) {
496   return ::sendto(socket, buf, len, flags, dest_addr, addrlen);
497 }
498 
OnResolveResult(AsyncResolverInterface * resolver)499 void PhysicalSocket::OnResolveResult(AsyncResolverInterface* resolver) {
500   if (resolver != resolver_) {
501     return;
502   }
503 
504   int error = resolver_->GetError();
505   if (error == 0) {
506     error = DoConnect(resolver_->address());
507   } else {
508     Close();
509   }
510 
511   if (error) {
512     SetError(error);
513     SignalCloseEvent(this, error);
514   }
515 }
516 
UpdateLastError()517 void PhysicalSocket::UpdateLastError() {
518   SetError(LAST_SYSTEM_ERROR);
519 }
520 
MaybeRemapSendError()521 void PhysicalSocket::MaybeRemapSendError() {
522 #if defined(WEBRTC_MAC)
523   // https://developer.apple.com/library/mac/documentation/Darwin/
524   // Reference/ManPages/man2/sendto.2.html
525   // ENOBUFS - The output queue for a network interface is full.
526   // This generally indicates that the interface has stopped sending,
527   // but may be caused by transient congestion.
528   if (GetError() == ENOBUFS) {
529     SetError(EWOULDBLOCK);
530   }
531 #endif
532 }
533 
SetEnabledEvents(uint8_t events)534 void PhysicalSocket::SetEnabledEvents(uint8_t events) {
535   enabled_events_ = events;
536 }
537 
EnableEvents(uint8_t events)538 void PhysicalSocket::EnableEvents(uint8_t events) {
539   enabled_events_ |= events;
540 }
541 
DisableEvents(uint8_t events)542 void PhysicalSocket::DisableEvents(uint8_t events) {
543   enabled_events_ &= ~events;
544 }
545 
TranslateOption(Option opt,int * slevel,int * sopt)546 int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) {
547   switch (opt) {
548     case OPT_DONTFRAGMENT:
549 #if defined(WEBRTC_WIN)
550       *slevel = IPPROTO_IP;
551       *sopt = IP_DONTFRAGMENT;
552       break;
553 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__)
554       RTC_LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
555       return -1;
556 #elif defined(WEBRTC_POSIX)
557       *slevel = IPPROTO_IP;
558       *sopt = IP_MTU_DISCOVER;
559       break;
560 #endif
561     case OPT_RCVBUF:
562       *slevel = SOL_SOCKET;
563       *sopt = SO_RCVBUF;
564       break;
565     case OPT_SNDBUF:
566       *slevel = SOL_SOCKET;
567       *sopt = SO_SNDBUF;
568       break;
569     case OPT_NODELAY:
570       *slevel = IPPROTO_TCP;
571       *sopt = TCP_NODELAY;
572       break;
573     case OPT_DSCP:
574 #if defined(WEBRTC_POSIX)
575       if (family_ == AF_INET6) {
576         *slevel = IPPROTO_IPV6;
577         *sopt = IPV6_TCLASS;
578       } else {
579         *slevel = IPPROTO_IP;
580         *sopt = IP_TOS;
581       }
582       break;
583 #else
584       RTC_LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
585       return -1;
586 #endif
587     case OPT_RTP_SENDTIME_EXTN_ID:
588       return -1;  // No logging is necessary as this not a OS socket option.
589     default:
590       RTC_NOTREACHED();
591       return -1;
592   }
593   return 0;
594 }
595 
SocketDispatcher(PhysicalSocketServer * ss)596 SocketDispatcher::SocketDispatcher(PhysicalSocketServer* ss)
597 #if defined(WEBRTC_WIN)
598     : PhysicalSocket(ss),
599       id_(0),
600       signal_close_(false)
601 #else
602     : PhysicalSocket(ss)
603 #endif
604 {
605 }
606 
SocketDispatcher(SOCKET s,PhysicalSocketServer * ss)607 SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
608 #if defined(WEBRTC_WIN)
609     : PhysicalSocket(ss, s),
610       id_(0),
611       signal_close_(false)
612 #else
613     : PhysicalSocket(ss, s)
614 #endif
615 {
616 }
617 
~SocketDispatcher()618 SocketDispatcher::~SocketDispatcher() {
619   Close();
620 }
621 
Initialize()622 bool SocketDispatcher::Initialize() {
623   RTC_DCHECK(s_ != INVALID_SOCKET);
624 // Must be a non-blocking
625 #if defined(WEBRTC_WIN)
626   u_long argp = 1;
627   ioctlsocket(s_, FIONBIO, &argp);
628 #elif defined(WEBRTC_POSIX)
629   fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
630 #endif
631 #if defined(WEBRTC_IOS)
632   // iOS may kill sockets when the app is moved to the background
633   // (specifically, if the app doesn't use the "voip" UIBackgroundMode). When
634   // we attempt to write to such a socket, SIGPIPE will be raised, which by
635   // default will terminate the process, which we don't want. By specifying
636   // this socket option, SIGPIPE will be disabled for the socket.
637   int value = 1;
638   ::setsockopt(s_, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value));
639 #endif
640   ss_->Add(this);
641   return true;
642 }
643 
Create(int type)644 bool SocketDispatcher::Create(int type) {
645   return Create(AF_INET, type);
646 }
647 
Create(int family,int type)648 bool SocketDispatcher::Create(int family, int type) {
649   // Change the socket to be non-blocking.
650   if (!PhysicalSocket::Create(family, type))
651     return false;
652 
653   if (!Initialize())
654     return false;
655 
656 #if defined(WEBRTC_WIN)
657   do {
658     id_ = ++next_id_;
659   } while (id_ == 0);
660 #endif
661   return true;
662 }
663 
664 #if defined(WEBRTC_WIN)
665 
GetWSAEvent()666 WSAEVENT SocketDispatcher::GetWSAEvent() {
667   return WSA_INVALID_EVENT;
668 }
669 
GetSocket()670 SOCKET SocketDispatcher::GetSocket() {
671   return s_;
672 }
673 
CheckSignalClose()674 bool SocketDispatcher::CheckSignalClose() {
675   if (!signal_close_)
676     return false;
677 
678   char ch;
679   if (recv(s_, &ch, 1, MSG_PEEK) > 0)
680     return false;
681 
682   state_ = CS_CLOSED;
683   signal_close_ = false;
684   SignalCloseEvent(this, signal_err_);
685   return true;
686 }
687 
688 int SocketDispatcher::next_id_ = 0;
689 
690 #elif defined(WEBRTC_POSIX)
691 
GetDescriptor()692 int SocketDispatcher::GetDescriptor() {
693   return s_;
694 }
695 
IsDescriptorClosed()696 bool SocketDispatcher::IsDescriptorClosed() {
697   if (udp_) {
698     // The MSG_PEEK trick doesn't work for UDP, since (at least in some
699     // circumstances) it requires reading an entire UDP packet, which would be
700     // bad for performance here. So, just check whether |s_| has been closed,
701     // which should be sufficient.
702     return s_ == INVALID_SOCKET;
703   }
704   // We don't have a reliable way of distinguishing end-of-stream
705   // from readability.  So test on each readable call.  Is this
706   // inefficient?  Probably.
707   char ch;
708   ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
709   if (res > 0) {
710     // Data available, so not closed.
711     return false;
712   } else if (res == 0) {
713     // EOF, so closed.
714     return true;
715   } else {  // error
716     switch (errno) {
717       // Returned if we've already closed s_.
718       case EBADF:
719       // Returned during ungraceful peer shutdown.
720       case ECONNRESET:
721         return true;
722       // The normal blocking error; don't log anything.
723       case EWOULDBLOCK:
724       // Interrupted system call.
725       case EINTR:
726         return false;
727       default:
728         // Assume that all other errors are just blocking errors, meaning the
729         // connection is still good but we just can't read from it right now.
730         // This should only happen when connecting (and at most once), because
731         // in all other cases this function is only called if the file
732         // descriptor is already known to be in the readable state. However,
733         // it's not necessary a problem if we spuriously interpret a
734         // "connection lost"-type error as a blocking error, because typically
735         // the next recv() will get EOF, so we'll still eventually notice that
736         // the socket is closed.
737         RTC_LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
738         return false;
739     }
740   }
741 }
742 
743 #endif  // WEBRTC_POSIX
744 
GetRequestedEvents()745 uint32_t SocketDispatcher::GetRequestedEvents() {
746   return enabled_events();
747 }
748 
OnPreEvent(uint32_t ff)749 void SocketDispatcher::OnPreEvent(uint32_t ff) {
750   if ((ff & DE_CONNECT) != 0)
751     state_ = CS_CONNECTED;
752 
753 #if defined(WEBRTC_WIN)
754 // We set CS_CLOSED from CheckSignalClose.
755 #elif defined(WEBRTC_POSIX)
756   if ((ff & DE_CLOSE) != 0)
757     state_ = CS_CLOSED;
758 #endif
759 }
760 
761 #if defined(WEBRTC_WIN)
762 
OnEvent(uint32_t ff,int err)763 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
764   int cache_id = id_;
765   // Make sure we deliver connect/accept first. Otherwise, consumers may see
766   // something like a READ followed by a CONNECT, which would be odd.
767   if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
768     if (ff != DE_CONNECT)
769       RTC_LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
770     DisableEvents(DE_CONNECT);
771 #if !defined(NDEBUG)
772     dbg_addr_ = "Connected @ ";
773     dbg_addr_.append(GetRemoteAddress().ToString());
774 #endif
775     SignalConnectEvent(this);
776   }
777   if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
778     DisableEvents(DE_ACCEPT);
779     SignalReadEvent(this);
780   }
781   if ((ff & DE_READ) != 0) {
782     DisableEvents(DE_READ);
783     SignalReadEvent(this);
784   }
785   if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
786     DisableEvents(DE_WRITE);
787     SignalWriteEvent(this);
788   }
789   if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
790     signal_close_ = true;
791     signal_err_ = err;
792   }
793 }
794 
795 #elif defined(WEBRTC_POSIX)
796 
OnEvent(uint32_t ff,int err)797 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
798 #if defined(WEBRTC_USE_EPOLL)
799   // Remember currently enabled events so we can combine multiple changes
800   // into one update call later.
801   // The signal handlers might re-enable events disabled here, so we can't
802   // keep a list of events to disable at the end of the method. This list
803   // would not be updated with the events enabled by the signal handlers.
804   StartBatchedEventUpdates();
805 #endif
806   // Make sure we deliver connect/accept first. Otherwise, consumers may see
807   // something like a READ followed by a CONNECT, which would be odd.
808   if ((ff & DE_CONNECT) != 0) {
809     DisableEvents(DE_CONNECT);
810     SignalConnectEvent(this);
811   }
812   if ((ff & DE_ACCEPT) != 0) {
813     DisableEvents(DE_ACCEPT);
814     SignalReadEvent(this);
815   }
816   if ((ff & DE_READ) != 0) {
817     DisableEvents(DE_READ);
818     SignalReadEvent(this);
819   }
820   if ((ff & DE_WRITE) != 0) {
821     DisableEvents(DE_WRITE);
822     SignalWriteEvent(this);
823   }
824   if ((ff & DE_CLOSE) != 0) {
825     // The socket is now dead to us, so stop checking it.
826     SetEnabledEvents(0);
827     SignalCloseEvent(this, err);
828   }
829 #if defined(WEBRTC_USE_EPOLL)
830   FinishBatchedEventUpdates();
831 #endif
832 }
833 
834 #endif  // WEBRTC_POSIX
835 
836 #if defined(WEBRTC_USE_EPOLL)
837 
GetEpollEvents(uint32_t ff)838 static int GetEpollEvents(uint32_t ff) {
839   int events = 0;
840   if (ff & (DE_READ | DE_ACCEPT)) {
841     events |= EPOLLIN;
842   }
843   if (ff & (DE_WRITE | DE_CONNECT)) {
844     events |= EPOLLOUT;
845   }
846   return events;
847 }
848 
StartBatchedEventUpdates()849 void SocketDispatcher::StartBatchedEventUpdates() {
850   RTC_DCHECK_EQ(saved_enabled_events_, -1);
851   saved_enabled_events_ = enabled_events();
852 }
853 
FinishBatchedEventUpdates()854 void SocketDispatcher::FinishBatchedEventUpdates() {
855   RTC_DCHECK_NE(saved_enabled_events_, -1);
856   uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_);
857   saved_enabled_events_ = -1;
858   MaybeUpdateDispatcher(old_events);
859 }
860 
MaybeUpdateDispatcher(uint8_t old_events)861 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
862   if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) &&
863       saved_enabled_events_ == -1) {
864     ss_->Update(this);
865   }
866 }
867 
SetEnabledEvents(uint8_t events)868 void SocketDispatcher::SetEnabledEvents(uint8_t events) {
869   uint8_t old_events = enabled_events();
870   PhysicalSocket::SetEnabledEvents(events);
871   MaybeUpdateDispatcher(old_events);
872 }
873 
EnableEvents(uint8_t events)874 void SocketDispatcher::EnableEvents(uint8_t events) {
875   uint8_t old_events = enabled_events();
876   PhysicalSocket::EnableEvents(events);
877   MaybeUpdateDispatcher(old_events);
878 }
879 
DisableEvents(uint8_t events)880 void SocketDispatcher::DisableEvents(uint8_t events) {
881   uint8_t old_events = enabled_events();
882   PhysicalSocket::DisableEvents(events);
883   MaybeUpdateDispatcher(old_events);
884 }
885 
886 #endif  // WEBRTC_USE_EPOLL
887 
Close()888 int SocketDispatcher::Close() {
889   if (s_ == INVALID_SOCKET)
890     return 0;
891 
892 #if defined(WEBRTC_WIN)
893   id_ = 0;
894   signal_close_ = false;
895 #endif
896 #if defined(WEBRTC_USE_EPOLL)
897   // If we're batching events, the socket can be closed and reopened
898   // during the batch. Set saved_enabled_events_ to 0 here so the new
899   // socket, if any, has the correct old events bitfield
900   if (saved_enabled_events_ != -1) {
901     saved_enabled_events_ = 0;
902   }
903 #endif
904   ss_->Remove(this);
905   return PhysicalSocket::Close();
906 }
907 
908 #if defined(WEBRTC_POSIX)
909 class EventDispatcher : public Dispatcher {
910  public:
EventDispatcher(PhysicalSocketServer * ss)911   EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
912     if (pipe(afd_) < 0)
913       RTC_LOG(LERROR) << "pipe failed";
914     ss_->Add(this);
915   }
916 
~EventDispatcher()917   ~EventDispatcher() override {
918     ss_->Remove(this);
919     close(afd_[0]);
920     close(afd_[1]);
921   }
922 
Signal()923   virtual void Signal() {
924     CritScope cs(&crit_);
925     if (!fSignaled_) {
926       const uint8_t b[1] = {0};
927       const ssize_t res = write(afd_[1], b, sizeof(b));
928       RTC_DCHECK_EQ(1, res);
929       fSignaled_ = true;
930     }
931   }
932 
GetRequestedEvents()933   uint32_t GetRequestedEvents() override { return DE_READ; }
934 
OnPreEvent(uint32_t ff)935   void OnPreEvent(uint32_t ff) override {
936     // It is not possible to perfectly emulate an auto-resetting event with
937     // pipes.  This simulates it by resetting before the event is handled.
938 
939     CritScope cs(&crit_);
940     if (fSignaled_) {
941       uint8_t b[4];  // Allow for reading more than 1 byte, but expect 1.
942       const ssize_t res = read(afd_[0], b, sizeof(b));
943       RTC_DCHECK_EQ(1, res);
944       fSignaled_ = false;
945     }
946   }
947 
OnEvent(uint32_t ff,int err)948   void OnEvent(uint32_t ff, int err) override { RTC_NOTREACHED(); }
949 
GetDescriptor()950   int GetDescriptor() override { return afd_[0]; }
951 
IsDescriptorClosed()952   bool IsDescriptorClosed() override { return false; }
953 
954  private:
955   PhysicalSocketServer* ss_;
956   int afd_[2];
957   bool fSignaled_;
958   RecursiveCriticalSection crit_;
959 };
960 
961 #endif  // WEBRTC_POSIX
962 
963 #if defined(WEBRTC_WIN)
FlagsToEvents(uint32_t events)964 static uint32_t FlagsToEvents(uint32_t events) {
965   uint32_t ffFD = FD_CLOSE;
966   if (events & DE_READ)
967     ffFD |= FD_READ;
968   if (events & DE_WRITE)
969     ffFD |= FD_WRITE;
970   if (events & DE_CONNECT)
971     ffFD |= FD_CONNECT;
972   if (events & DE_ACCEPT)
973     ffFD |= FD_ACCEPT;
974   return ffFD;
975 }
976 
977 class EventDispatcher : public Dispatcher {
978  public:
EventDispatcher(PhysicalSocketServer * ss)979   EventDispatcher(PhysicalSocketServer* ss) : ss_(ss) {
980     hev_ = WSACreateEvent();
981     if (hev_) {
982       ss_->Add(this);
983     }
984   }
985 
~EventDispatcher()986   ~EventDispatcher() override {
987     if (hev_ != nullptr) {
988       ss_->Remove(this);
989       WSACloseEvent(hev_);
990       hev_ = nullptr;
991     }
992   }
993 
Signal()994   virtual void Signal() {
995     if (hev_ != nullptr)
996       WSASetEvent(hev_);
997   }
998 
GetRequestedEvents()999   uint32_t GetRequestedEvents() override { return 0; }
1000 
OnPreEvent(uint32_t ff)1001   void OnPreEvent(uint32_t ff) override { WSAResetEvent(hev_); }
1002 
OnEvent(uint32_t ff,int err)1003   void OnEvent(uint32_t ff, int err) override {}
1004 
GetWSAEvent()1005   WSAEVENT GetWSAEvent() override { return hev_; }
1006 
GetSocket()1007   SOCKET GetSocket() override { return INVALID_SOCKET; }
1008 
CheckSignalClose()1009   bool CheckSignalClose() override { return false; }
1010 
1011  private:
1012   PhysicalSocketServer* ss_;
1013   WSAEVENT hev_;
1014 };
1015 #endif  // WEBRTC_WIN
1016 
1017 // Sets the value of a boolean value to false when signaled.
1018 class Signaler : public EventDispatcher {
1019  public:
Signaler(PhysicalSocketServer * ss,bool * pf)1020   Signaler(PhysicalSocketServer* ss, bool* pf) : EventDispatcher(ss), pf_(pf) {}
~Signaler()1021   ~Signaler() override {}
1022 
OnEvent(uint32_t ff,int err)1023   void OnEvent(uint32_t ff, int err) override {
1024     if (pf_)
1025       *pf_ = false;
1026   }
1027 
1028  private:
1029   bool* pf_;
1030 };
1031 
PhysicalSocketServer()1032 PhysicalSocketServer::PhysicalSocketServer()
1033     :
1034 #if defined(WEBRTC_USE_EPOLL)
1035       // Since Linux 2.6.8, the size argument is ignored, but must be greater
1036       // than zero. Before that the size served as hint to the kernel for the
1037       // amount of space to initially allocate in internal data structures.
1038       epoll_fd_(epoll_create(FD_SETSIZE)),
1039 #endif
1040 #if defined(WEBRTC_WIN)
1041       socket_ev_(WSACreateEvent()),
1042 #endif
1043       fWait_(false) {
1044 #if defined(WEBRTC_USE_EPOLL)
1045   if (epoll_fd_ == -1) {
1046     // Not an error, will fall back to "select" below.
1047     RTC_LOG_E(LS_WARNING, EN, errno) << "epoll_create";
1048     // Note that -1 == INVALID_SOCKET, the alias used by later checks.
1049   }
1050 #endif
1051   signal_wakeup_ = new Signaler(this, &fWait_);
1052 }
1053 
~PhysicalSocketServer()1054 PhysicalSocketServer::~PhysicalSocketServer() {
1055 #if defined(WEBRTC_WIN)
1056   WSACloseEvent(socket_ev_);
1057 #endif
1058   delete signal_wakeup_;
1059 #if defined(WEBRTC_USE_EPOLL)
1060   if (epoll_fd_ != INVALID_SOCKET) {
1061     close(epoll_fd_);
1062   }
1063 #endif
1064   RTC_DCHECK(dispatchers_.empty());
1065 }
1066 
WakeUp()1067 void PhysicalSocketServer::WakeUp() {
1068   signal_wakeup_->Signal();
1069 }
1070 
CreateSocket(int family,int type)1071 Socket* PhysicalSocketServer::CreateSocket(int family, int type) {
1072   PhysicalSocket* socket = new PhysicalSocket(this);
1073   if (socket->Create(family, type)) {
1074     return socket;
1075   } else {
1076     delete socket;
1077     return nullptr;
1078   }
1079 }
1080 
CreateAsyncSocket(int family,int type)1081 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {
1082   SocketDispatcher* dispatcher = new SocketDispatcher(this);
1083   if (dispatcher->Create(family, type)) {
1084     return dispatcher;
1085   } else {
1086     delete dispatcher;
1087     return nullptr;
1088   }
1089 }
1090 
WrapSocket(SOCKET s)1091 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
1092   SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
1093   if (dispatcher->Initialize()) {
1094     return dispatcher;
1095   } else {
1096     delete dispatcher;
1097     return nullptr;
1098   }
1099 }
1100 
Add(Dispatcher * pdispatcher)1101 void PhysicalSocketServer::Add(Dispatcher* pdispatcher) {
1102   CritScope cs(&crit_);
1103   if (processing_dispatchers_) {
1104     // A dispatcher is being added while a "Wait" call is processing the
1105     // list of socket events.
1106     // Defer adding to "dispatchers_" set until processing is done to avoid
1107     // invalidating the iterator in "Wait".
1108     pending_remove_dispatchers_.erase(pdispatcher);
1109     pending_add_dispatchers_.insert(pdispatcher);
1110   } else {
1111     dispatchers_.insert(pdispatcher);
1112   }
1113 #if defined(WEBRTC_USE_EPOLL)
1114   if (epoll_fd_ != INVALID_SOCKET) {
1115     AddEpoll(pdispatcher);
1116   }
1117 #endif  // WEBRTC_USE_EPOLL
1118 }
1119 
Remove(Dispatcher * pdispatcher)1120 void PhysicalSocketServer::Remove(Dispatcher* pdispatcher) {
1121   CritScope cs(&crit_);
1122   if (processing_dispatchers_) {
1123     // A dispatcher is being removed while a "Wait" call is processing the
1124     // list of socket events.
1125     // Defer removal from "dispatchers_" set until processing is done to avoid
1126     // invalidating the iterator in "Wait".
1127     if (!pending_add_dispatchers_.erase(pdispatcher) &&
1128         dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1129       RTC_LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1130                              "dispatcher, potentially from a duplicate call to "
1131                              "Add.";
1132       return;
1133     }
1134 
1135     pending_remove_dispatchers_.insert(pdispatcher);
1136   } else if (!dispatchers_.erase(pdispatcher)) {
1137     RTC_LOG(LS_WARNING)
1138         << "PhysicalSocketServer asked to remove a unknown "
1139            "dispatcher, potentially from a duplicate call to Add.";
1140     return;
1141   }
1142 #if defined(WEBRTC_USE_EPOLL)
1143   if (epoll_fd_ != INVALID_SOCKET) {
1144     RemoveEpoll(pdispatcher);
1145   }
1146 #endif  // WEBRTC_USE_EPOLL
1147 }
1148 
Update(Dispatcher * pdispatcher)1149 void PhysicalSocketServer::Update(Dispatcher* pdispatcher) {
1150 #if defined(WEBRTC_USE_EPOLL)
1151   if (epoll_fd_ == INVALID_SOCKET) {
1152     return;
1153   }
1154 
1155   CritScope cs(&crit_);
1156   if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1157     return;
1158   }
1159 
1160   UpdateEpoll(pdispatcher);
1161 #endif
1162 }
1163 
AddRemovePendingDispatchers()1164 void PhysicalSocketServer::AddRemovePendingDispatchers() {
1165   if (!pending_add_dispatchers_.empty()) {
1166     for (Dispatcher* pdispatcher : pending_add_dispatchers_) {
1167       dispatchers_.insert(pdispatcher);
1168     }
1169     pending_add_dispatchers_.clear();
1170   }
1171 
1172   if (!pending_remove_dispatchers_.empty()) {
1173     for (Dispatcher* pdispatcher : pending_remove_dispatchers_) {
1174       dispatchers_.erase(pdispatcher);
1175     }
1176     pending_remove_dispatchers_.clear();
1177   }
1178 }
1179 
1180 #if defined(WEBRTC_POSIX)
1181 
Wait(int cmsWait,bool process_io)1182 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1183 #if defined(WEBRTC_USE_EPOLL)
1184   // We don't keep a dedicated "epoll" descriptor containing only the non-IO
1185   // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
1186   // "select" to support sockets larger than FD_SETSIZE.
1187   if (!process_io) {
1188     return WaitPoll(cmsWait, signal_wakeup_);
1189   } else if (epoll_fd_ != INVALID_SOCKET) {
1190     return WaitEpoll(cmsWait);
1191   }
1192 #endif
1193   return WaitSelect(cmsWait, process_io);
1194 }
1195 
ProcessEvents(Dispatcher * dispatcher,bool readable,bool writable,bool check_error)1196 static void ProcessEvents(Dispatcher* dispatcher,
1197                           bool readable,
1198                           bool writable,
1199                           bool check_error) {
1200   int errcode = 0;
1201   // TODO(pthatcher): Should we set errcode if getsockopt fails?
1202   if (check_error) {
1203     socklen_t len = sizeof(errcode);
1204     ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode,
1205                  &len);
1206   }
1207 
1208   uint32_t ff = 0;
1209 
1210   // Check readable descriptors. If we're waiting on an accept, signal
1211   // that. Otherwise we're waiting for data, check to see if we're
1212   // readable or really closed.
1213   // TODO(pthatcher): Only peek at TCP descriptors.
1214   if (readable) {
1215     if (dispatcher->GetRequestedEvents() & DE_ACCEPT) {
1216       ff |= DE_ACCEPT;
1217     } else if (errcode || dispatcher->IsDescriptorClosed()) {
1218       ff |= DE_CLOSE;
1219     } else {
1220       ff |= DE_READ;
1221     }
1222   }
1223 
1224   // Check writable descriptors. If we're waiting on a connect, detect
1225   // success versus failure by the reaped error code.
1226   if (writable) {
1227     if (dispatcher->GetRequestedEvents() & DE_CONNECT) {
1228       if (!errcode) {
1229         ff |= DE_CONNECT;
1230       } else {
1231         ff |= DE_CLOSE;
1232       }
1233     } else {
1234       ff |= DE_WRITE;
1235     }
1236   }
1237 
1238   // Tell the descriptor about the event.
1239   if (ff != 0) {
1240     dispatcher->OnPreEvent(ff);
1241     dispatcher->OnEvent(ff, errcode);
1242   }
1243 }
1244 
WaitSelect(int cmsWait,bool process_io)1245 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
1246   // Calculate timing information
1247 
1248   struct timeval* ptvWait = nullptr;
1249   struct timeval tvWait;
1250   int64_t stop_us;
1251   if (cmsWait != kForever) {
1252     // Calculate wait timeval
1253     tvWait.tv_sec = cmsWait / 1000;
1254     tvWait.tv_usec = (cmsWait % 1000) * 1000;
1255     ptvWait = &tvWait;
1256 
1257     // Calculate when to return
1258     stop_us = rtc::TimeMicros() + cmsWait * 1000;
1259   }
1260 
1261   // Zero all fd_sets. Don't need to do this inside the loop since
1262   // select() zeros the descriptors not signaled
1263 
1264   fd_set fdsRead;
1265   FD_ZERO(&fdsRead);
1266   fd_set fdsWrite;
1267   FD_ZERO(&fdsWrite);
1268 // Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the
1269 // inline assembly in FD_ZERO.
1270 // http://crbug.com/344505
1271 #ifdef MEMORY_SANITIZER
1272   __msan_unpoison(&fdsRead, sizeof(fdsRead));
1273   __msan_unpoison(&fdsWrite, sizeof(fdsWrite));
1274 #endif
1275 
1276   fWait_ = true;
1277 
1278   while (fWait_) {
1279     int fdmax = -1;
1280     {
1281       CritScope cr(&crit_);
1282       // TODO(jbauch): Support re-entrant waiting.
1283       RTC_DCHECK(!processing_dispatchers_);
1284       for (Dispatcher* pdispatcher : dispatchers_) {
1285         // Query dispatchers for read and write wait state
1286         RTC_DCHECK(pdispatcher);
1287         if (!process_io && (pdispatcher != signal_wakeup_))
1288           continue;
1289         int fd = pdispatcher->GetDescriptor();
1290         // "select"ing a file descriptor that is equal to or larger than
1291         // FD_SETSIZE will result in undefined behavior.
1292         RTC_DCHECK_LT(fd, FD_SETSIZE);
1293         if (fd > fdmax)
1294           fdmax = fd;
1295 
1296         uint32_t ff = pdispatcher->GetRequestedEvents();
1297         if (ff & (DE_READ | DE_ACCEPT))
1298           FD_SET(fd, &fdsRead);
1299         if (ff & (DE_WRITE | DE_CONNECT))
1300           FD_SET(fd, &fdsWrite);
1301       }
1302     }
1303 
1304     // Wait then call handlers as appropriate
1305     // < 0 means error
1306     // 0 means timeout
1307     // > 0 means count of descriptors ready
1308     int n = select(fdmax + 1, &fdsRead, &fdsWrite, nullptr, ptvWait);
1309 
1310     // If error, return error.
1311     if (n < 0) {
1312       if (errno != EINTR) {
1313         RTC_LOG_E(LS_ERROR, EN, errno) << "select";
1314         return false;
1315       }
1316       // Else ignore the error and keep going. If this EINTR was for one of the
1317       // signals managed by this PhysicalSocketServer, the
1318       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1319       // iteration.
1320     } else if (n == 0) {
1321       // If timeout, return success
1322       return true;
1323     } else {
1324       // We have signaled descriptors
1325       CritScope cr(&crit_);
1326       processing_dispatchers_ = true;
1327       for (Dispatcher* pdispatcher : dispatchers_) {
1328         int fd = pdispatcher->GetDescriptor();
1329 
1330         bool readable = FD_ISSET(fd, &fdsRead);
1331         if (readable) {
1332           FD_CLR(fd, &fdsRead);
1333         }
1334 
1335         bool writable = FD_ISSET(fd, &fdsWrite);
1336         if (writable) {
1337           FD_CLR(fd, &fdsWrite);
1338         }
1339 
1340         // The error code can be signaled through reads or writes.
1341         ProcessEvents(pdispatcher, readable, writable, readable || writable);
1342       }
1343 
1344       processing_dispatchers_ = false;
1345       // Process deferred dispatchers that have been added/removed while the
1346       // events were handled above.
1347       AddRemovePendingDispatchers();
1348     }
1349 
1350     // Recalc the time remaining to wait. Doing it here means it doesn't get
1351     // calced twice the first time through the loop
1352     if (ptvWait) {
1353       ptvWait->tv_sec = 0;
1354       ptvWait->tv_usec = 0;
1355       int64_t time_left_us = stop_us - rtc::TimeMicros();
1356       if (time_left_us > 0) {
1357         ptvWait->tv_sec = time_left_us / rtc::kNumMicrosecsPerSec;
1358         ptvWait->tv_usec = time_left_us % rtc::kNumMicrosecsPerSec;
1359       }
1360     }
1361   }
1362 
1363   return true;
1364 }
1365 
1366 #if defined(WEBRTC_USE_EPOLL)
1367 
AddEpoll(Dispatcher * pdispatcher)1368 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
1369   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1370   int fd = pdispatcher->GetDescriptor();
1371   RTC_DCHECK(fd != INVALID_SOCKET);
1372   if (fd == INVALID_SOCKET) {
1373     return;
1374   }
1375 
1376   struct epoll_event event = {0};
1377   event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1378   event.data.ptr = pdispatcher;
1379   int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
1380   RTC_DCHECK_EQ(err, 0);
1381   if (err == -1) {
1382     RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
1383   }
1384 }
1385 
RemoveEpoll(Dispatcher * pdispatcher)1386 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
1387   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1388   int fd = pdispatcher->GetDescriptor();
1389   RTC_DCHECK(fd != INVALID_SOCKET);
1390   if (fd == INVALID_SOCKET) {
1391     return;
1392   }
1393 
1394   struct epoll_event event = {0};
1395   int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
1396   RTC_DCHECK(err == 0 || errno == ENOENT);
1397   if (err == -1) {
1398     if (errno == ENOENT) {
1399       // Socket has already been closed.
1400       RTC_LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1401     } else {
1402       RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1403     }
1404   }
1405 }
1406 
UpdateEpoll(Dispatcher * pdispatcher)1407 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
1408   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1409   int fd = pdispatcher->GetDescriptor();
1410   RTC_DCHECK(fd != INVALID_SOCKET);
1411   if (fd == INVALID_SOCKET) {
1412     return;
1413   }
1414 
1415   struct epoll_event event = {0};
1416   event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1417   event.data.ptr = pdispatcher;
1418   int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
1419   RTC_DCHECK_EQ(err, 0);
1420   if (err == -1) {
1421     RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
1422   }
1423 }
1424 
WaitEpoll(int cmsWait)1425 bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
1426   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1427   int64_t tvWait = -1;
1428   int64_t tvStop = -1;
1429   if (cmsWait != kForever) {
1430     tvWait = cmsWait;
1431     tvStop = TimeAfter(cmsWait);
1432   }
1433 
1434   fWait_ = true;
1435   while (fWait_) {
1436     // Wait then call handlers as appropriate
1437     // < 0 means error
1438     // 0 means timeout
1439     // > 0 means count of descriptors ready
1440     int n = epoll_wait(epoll_fd_, epoll_events_.data(), epoll_events_.size(),
1441                        static_cast<int>(tvWait));
1442     if (n < 0) {
1443       if (errno != EINTR) {
1444         RTC_LOG_E(LS_ERROR, EN, errno) << "epoll";
1445         return false;
1446       }
1447       // Else ignore the error and keep going. If this EINTR was for one of the
1448       // signals managed by this PhysicalSocketServer, the
1449       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1450       // iteration.
1451     } else if (n == 0) {
1452       // If timeout, return success
1453       return true;
1454     } else {
1455       // We have signaled descriptors
1456       CritScope cr(&crit_);
1457       for (int i = 0; i < n; ++i) {
1458         const epoll_event& event = epoll_events_[i];
1459         Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr);
1460         if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1461           // The dispatcher for this socket no longer exists.
1462           continue;
1463         }
1464 
1465         bool readable = (event.events & (EPOLLIN | EPOLLPRI));
1466         bool writable = (event.events & EPOLLOUT);
1467         bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP));
1468 
1469         ProcessEvents(pdispatcher, readable, writable, check_error);
1470       }
1471     }
1472 
1473     if (cmsWait != kForever) {
1474       tvWait = TimeDiff(tvStop, TimeMillis());
1475       if (tvWait < 0) {
1476         // Return success on timeout.
1477         return true;
1478       }
1479     }
1480   }
1481 
1482   return true;
1483 }
1484 
WaitPoll(int cmsWait,Dispatcher * dispatcher)1485 bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
1486   RTC_DCHECK(dispatcher);
1487   int64_t tvWait = -1;
1488   int64_t tvStop = -1;
1489   if (cmsWait != kForever) {
1490     tvWait = cmsWait;
1491     tvStop = TimeAfter(cmsWait);
1492   }
1493 
1494   fWait_ = true;
1495 
1496   struct pollfd fds = {0};
1497   int fd = dispatcher->GetDescriptor();
1498   fds.fd = fd;
1499 
1500   while (fWait_) {
1501     uint32_t ff = dispatcher->GetRequestedEvents();
1502     fds.events = 0;
1503     if (ff & (DE_READ | DE_ACCEPT)) {
1504       fds.events |= POLLIN;
1505     }
1506     if (ff & (DE_WRITE | DE_CONNECT)) {
1507       fds.events |= POLLOUT;
1508     }
1509     fds.revents = 0;
1510 
1511     // Wait then call handlers as appropriate
1512     // < 0 means error
1513     // 0 means timeout
1514     // > 0 means count of descriptors ready
1515     int n = poll(&fds, 1, static_cast<int>(tvWait));
1516     if (n < 0) {
1517       if (errno != EINTR) {
1518         RTC_LOG_E(LS_ERROR, EN, errno) << "poll";
1519         return false;
1520       }
1521       // Else ignore the error and keep going. If this EINTR was for one of the
1522       // signals managed by this PhysicalSocketServer, the
1523       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1524       // iteration.
1525     } else if (n == 0) {
1526       // If timeout, return success
1527       return true;
1528     } else {
1529       // We have signaled descriptors (should only be the passed dispatcher).
1530       RTC_DCHECK_EQ(n, 1);
1531       RTC_DCHECK_EQ(fds.fd, fd);
1532 
1533       bool readable = (fds.revents & (POLLIN | POLLPRI));
1534       bool writable = (fds.revents & POLLOUT);
1535       bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
1536 
1537       ProcessEvents(dispatcher, readable, writable, check_error);
1538     }
1539 
1540     if (cmsWait != kForever) {
1541       tvWait = TimeDiff(tvStop, TimeMillis());
1542       if (tvWait < 0) {
1543         // Return success on timeout.
1544         return true;
1545       }
1546     }
1547   }
1548 
1549   return true;
1550 }
1551 
1552 #endif  // WEBRTC_USE_EPOLL
1553 
1554 #endif  // WEBRTC_POSIX
1555 
1556 #if defined(WEBRTC_WIN)
Wait(int cmsWait,bool process_io)1557 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1558   int64_t cmsTotal = cmsWait;
1559   int64_t cmsElapsed = 0;
1560   int64_t msStart = Time();
1561 
1562   fWait_ = true;
1563   while (fWait_) {
1564     std::vector<WSAEVENT> events;
1565     std::vector<Dispatcher*> event_owners;
1566 
1567     events.push_back(socket_ev_);
1568 
1569     {
1570       CritScope cr(&crit_);
1571       // TODO(jbauch): Support re-entrant waiting.
1572       RTC_DCHECK(!processing_dispatchers_);
1573 
1574       // Calling "CheckSignalClose" might remove a closed dispatcher from the
1575       // set. This must be deferred to prevent invalidating the iterator.
1576       processing_dispatchers_ = true;
1577       for (Dispatcher* disp : dispatchers_) {
1578         if (!process_io && (disp != signal_wakeup_))
1579           continue;
1580         SOCKET s = disp->GetSocket();
1581         if (disp->CheckSignalClose()) {
1582           // We just signalled close, don't poll this socket
1583         } else if (s != INVALID_SOCKET) {
1584           WSAEventSelect(s, events[0],
1585                          FlagsToEvents(disp->GetRequestedEvents()));
1586         } else {
1587           events.push_back(disp->GetWSAEvent());
1588           event_owners.push_back(disp);
1589         }
1590       }
1591 
1592       processing_dispatchers_ = false;
1593       // Process deferred dispatchers that have been added/removed while the
1594       // events were handled above.
1595       AddRemovePendingDispatchers();
1596     }
1597 
1598     // Which is shorter, the delay wait or the asked wait?
1599 
1600     int64_t cmsNext;
1601     if (cmsWait == kForever) {
1602       cmsNext = cmsWait;
1603     } else {
1604       cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
1605     }
1606 
1607     // Wait for one of the events to signal
1608     DWORD dw =
1609         WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), &events[0],
1610                                  false, static_cast<DWORD>(cmsNext), false);
1611 
1612     if (dw == WSA_WAIT_FAILED) {
1613       // Failed?
1614       // TODO(pthatcher): need a better strategy than this!
1615       WSAGetLastError();
1616       RTC_NOTREACHED();
1617       return false;
1618     } else if (dw == WSA_WAIT_TIMEOUT) {
1619       // Timeout?
1620       return true;
1621     } else {
1622       // Figure out which one it is and call it
1623       CritScope cr(&crit_);
1624       int index = dw - WSA_WAIT_EVENT_0;
1625       if (index > 0) {
1626         --index;  // The first event is the socket event
1627         Dispatcher* disp = event_owners[index];
1628         // The dispatcher could have been removed while waiting for events.
1629         if (dispatchers_.find(disp) != dispatchers_.end()) {
1630           disp->OnPreEvent(0);
1631           disp->OnEvent(0, 0);
1632         }
1633       } else if (process_io) {
1634         processing_dispatchers_ = true;
1635         for (Dispatcher* disp : dispatchers_) {
1636           SOCKET s = disp->GetSocket();
1637           if (s == INVALID_SOCKET)
1638             continue;
1639 
1640           WSANETWORKEVENTS wsaEvents;
1641           int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
1642           if (err == 0) {
1643             {
1644               if ((wsaEvents.lNetworkEvents & FD_READ) &&
1645                   wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
1646                 RTC_LOG(WARNING)
1647                     << "PhysicalSocketServer got FD_READ_BIT error "
1648                     << wsaEvents.iErrorCode[FD_READ_BIT];
1649               }
1650               if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
1651                   wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
1652                 RTC_LOG(WARNING)
1653                     << "PhysicalSocketServer got FD_WRITE_BIT error "
1654                     << wsaEvents.iErrorCode[FD_WRITE_BIT];
1655               }
1656               if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
1657                   wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
1658                 RTC_LOG(WARNING)
1659                     << "PhysicalSocketServer got FD_CONNECT_BIT error "
1660                     << wsaEvents.iErrorCode[FD_CONNECT_BIT];
1661               }
1662               if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
1663                   wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
1664                 RTC_LOG(WARNING)
1665                     << "PhysicalSocketServer got FD_ACCEPT_BIT error "
1666                     << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
1667               }
1668               if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
1669                   wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
1670                 RTC_LOG(WARNING)
1671                     << "PhysicalSocketServer got FD_CLOSE_BIT error "
1672                     << wsaEvents.iErrorCode[FD_CLOSE_BIT];
1673               }
1674             }
1675             uint32_t ff = 0;
1676             int errcode = 0;
1677             if (wsaEvents.lNetworkEvents & FD_READ)
1678               ff |= DE_READ;
1679             if (wsaEvents.lNetworkEvents & FD_WRITE)
1680               ff |= DE_WRITE;
1681             if (wsaEvents.lNetworkEvents & FD_CONNECT) {
1682               if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
1683                 ff |= DE_CONNECT;
1684               } else {
1685                 ff |= DE_CLOSE;
1686                 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
1687               }
1688             }
1689             if (wsaEvents.lNetworkEvents & FD_ACCEPT)
1690               ff |= DE_ACCEPT;
1691             if (wsaEvents.lNetworkEvents & FD_CLOSE) {
1692               ff |= DE_CLOSE;
1693               errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
1694             }
1695             if (ff != 0) {
1696               disp->OnPreEvent(ff);
1697               disp->OnEvent(ff, errcode);
1698             }
1699           }
1700         }
1701 
1702         processing_dispatchers_ = false;
1703         // Process deferred dispatchers that have been added/removed while the
1704         // events were handled above.
1705         AddRemovePendingDispatchers();
1706       }
1707 
1708       // Reset the network event until new activity occurs
1709       WSAResetEvent(socket_ev_);
1710     }
1711 
1712     // Break?
1713     if (!fWait_)
1714       break;
1715     cmsElapsed = TimeSince(msStart);
1716     if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
1717       break;
1718     }
1719   }
1720 
1721   // Done
1722   return true;
1723 }
1724 #endif  // WEBRTC_WIN
1725 
1726 }  // namespace rtc
1727