1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10 #include "rtc_base/physical_socket_server.h"
11
12 #if defined(_MSC_VER) && _MSC_VER < 1300
13 #pragma warning(disable : 4786)
14 #endif
15
16 #ifdef MEMORY_SANITIZER
17 #include <sanitizer/msan_interface.h>
18 #endif
19
20 #if defined(WEBRTC_POSIX)
21 #include <fcntl.h>
22 #include <string.h>
23 #if defined(WEBRTC_USE_EPOLL)
24 // "poll" will be used to wait for the signal dispatcher.
25 #include <poll.h>
26 #endif
27 #include <sys/ioctl.h>
28 #include <sys/select.h>
29 #include <sys/time.h>
30 #include <unistd.h>
31 #endif
32
33 #if defined(WEBRTC_WIN)
34 #include <windows.h>
35 #include <winsock2.h>
36 #include <ws2tcpip.h>
37 #undef SetPort
38 #endif
39
40 #include <errno.h>
41
42 #include <algorithm>
43 #include <map>
44
45 #include "rtc_base/arraysize.h"
46 #include "rtc_base/byte_order.h"
47 #include "rtc_base/checks.h"
48 #include "rtc_base/logging.h"
49 #include "rtc_base/network_monitor.h"
50 #include "rtc_base/null_socket_server.h"
51 #include "rtc_base/time_utils.h"
52
53 #if defined(WEBRTC_LINUX)
54 #include <linux/sockios.h>
55 #endif
56
57 #if defined(WEBRTC_WIN)
58 #define LAST_SYSTEM_ERROR (::GetLastError())
59 #elif defined(__native_client__) && __native_client__
60 #define LAST_SYSTEM_ERROR (0)
61 #elif defined(WEBRTC_POSIX)
62 #define LAST_SYSTEM_ERROR (errno)
63 #endif // WEBRTC_WIN
64
65 #if defined(WEBRTC_POSIX)
66 #include <netinet/tcp.h> // for TCP_NODELAY
67 #define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h
68 typedef void* SockOptArg;
69
70 #endif // WEBRTC_POSIX
71
72 #if defined(WEBRTC_POSIX) && !defined(WEBRTC_MAC) && !defined(__native_client__)
73
GetSocketRecvTimestamp(int socket)74 int64_t GetSocketRecvTimestamp(int socket) {
75 struct timeval tv_ioctl;
76 int ret = ioctl(socket, SIOCGSTAMP, &tv_ioctl);
77 if (ret != 0)
78 return -1;
79 int64_t timestamp =
80 rtc::kNumMicrosecsPerSec * static_cast<int64_t>(tv_ioctl.tv_sec) +
81 static_cast<int64_t>(tv_ioctl.tv_usec);
82 return timestamp;
83 }
84
85 #else
86
GetSocketRecvTimestamp(int socket)87 int64_t GetSocketRecvTimestamp(int socket) {
88 return -1;
89 }
90 #endif
91
92 #if defined(WEBRTC_WIN)
93 typedef char* SockOptArg;
94 #endif
95
96 #if defined(WEBRTC_USE_EPOLL)
97 // POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17.
98 #if !defined(POLLRDHUP)
99 #define POLLRDHUP 0x2000
100 #endif
101 #if !defined(EPOLLRDHUP)
102 #define EPOLLRDHUP 0x2000
103 #endif
104 #endif
105
106 namespace rtc {
107
CreateDefault()108 std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
109 #if defined(__native_client__)
110 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);
111 #else
112 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer);
113 #endif
114 }
115
PhysicalSocket(PhysicalSocketServer * ss,SOCKET s)116 PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s)
117 : ss_(ss),
118 s_(s),
119 error_(0),
120 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
121 resolver_(nullptr) {
122 if (s_ != INVALID_SOCKET) {
123 SetEnabledEvents(DE_READ | DE_WRITE);
124
125 int type = SOCK_STREAM;
126 socklen_t len = sizeof(type);
127 const int res =
128 getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len);
129 RTC_DCHECK_EQ(0, res);
130 udp_ = (SOCK_DGRAM == type);
131 }
132 }
133
~PhysicalSocket()134 PhysicalSocket::~PhysicalSocket() {
135 Close();
136 }
137
Create(int family,int type)138 bool PhysicalSocket::Create(int family, int type) {
139 Close();
140 s_ = ::socket(family, type, 0);
141 udp_ = (SOCK_DGRAM == type);
142 family_ = family;
143 UpdateLastError();
144 if (udp_) {
145 SetEnabledEvents(DE_READ | DE_WRITE);
146 }
147 return s_ != INVALID_SOCKET;
148 }
149
GetLocalAddress() const150 SocketAddress PhysicalSocket::GetLocalAddress() const {
151 sockaddr_storage addr_storage = {};
152 socklen_t addrlen = sizeof(addr_storage);
153 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
154 int result = ::getsockname(s_, addr, &addrlen);
155 SocketAddress address;
156 if (result >= 0) {
157 SocketAddressFromSockAddrStorage(addr_storage, &address);
158 } else {
159 RTC_LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
160 << s_;
161 }
162 return address;
163 }
164
GetRemoteAddress() const165 SocketAddress PhysicalSocket::GetRemoteAddress() const {
166 sockaddr_storage addr_storage = {};
167 socklen_t addrlen = sizeof(addr_storage);
168 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
169 int result = ::getpeername(s_, addr, &addrlen);
170 SocketAddress address;
171 if (result >= 0) {
172 SocketAddressFromSockAddrStorage(addr_storage, &address);
173 } else {
174 RTC_LOG(LS_WARNING)
175 << "GetRemoteAddress: unable to get remote addr, socket=" << s_;
176 }
177 return address;
178 }
179
Bind(const SocketAddress & bind_addr)180 int PhysicalSocket::Bind(const SocketAddress& bind_addr) {
181 SocketAddress copied_bind_addr = bind_addr;
182 // If a network binder is available, use it to bind a socket to an interface
183 // instead of bind(), since this is more reliable on an OS with a weak host
184 // model.
185 if (ss_->network_binder() && !bind_addr.IsAnyIP()) {
186 NetworkBindingResult result =
187 ss_->network_binder()->BindSocketToNetwork(s_, bind_addr.ipaddr());
188 if (result == NetworkBindingResult::SUCCESS) {
189 // Since the network binder handled binding the socket to the desired
190 // network interface, we don't need to (and shouldn't) include an IP in
191 // the bind() call; bind() just needs to assign a port.
192 copied_bind_addr.SetIP(GetAnyIP(copied_bind_addr.ipaddr().family()));
193 } else if (result == NetworkBindingResult::NOT_IMPLEMENTED) {
194 RTC_LOG(LS_INFO) << "Can't bind socket to network because "
195 "network binding is not implemented for this OS.";
196 } else {
197 if (bind_addr.IsLoopbackIP()) {
198 // If we couldn't bind to a loopback IP (which should only happen in
199 // test scenarios), continue on. This may be expected behavior.
200 RTC_LOG(LS_VERBOSE) << "Binding socket to loopback address"
201 << " failed; result: " << static_cast<int>(result);
202 } else {
203 RTC_LOG(LS_WARNING) << "Binding socket to network address"
204 << " failed; result: " << static_cast<int>(result);
205 // If a network binding was attempted and failed, we should stop here
206 // and not try to use the socket. Otherwise, we may end up sending
207 // packets with an invalid source address.
208 // See: https://bugs.chromium.org/p/webrtc/issues/detail?id=7026
209 return -1;
210 }
211 }
212 }
213 sockaddr_storage addr_storage;
214 size_t len = copied_bind_addr.ToSockAddrStorage(&addr_storage);
215 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
216 int err = ::bind(s_, addr, static_cast<int>(len));
217 UpdateLastError();
218 #if !defined(NDEBUG)
219 if (0 == err) {
220 dbg_addr_ = "Bound @ ";
221 dbg_addr_.append(GetLocalAddress().ToString());
222 }
223 #endif
224 return err;
225 }
226
Connect(const SocketAddress & addr)227 int PhysicalSocket::Connect(const SocketAddress& addr) {
228 // TODO(pthatcher): Implicit creation is required to reconnect...
229 // ...but should we make it more explicit?
230 if (state_ != CS_CLOSED) {
231 SetError(EALREADY);
232 return SOCKET_ERROR;
233 }
234 if (addr.IsUnresolvedIP()) {
235 RTC_LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
236 resolver_ = new AsyncResolver();
237 resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult);
238 resolver_->Start(addr);
239 state_ = CS_CONNECTING;
240 return 0;
241 }
242
243 return DoConnect(addr);
244 }
245
DoConnect(const SocketAddress & connect_addr)246 int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) {
247 if ((s_ == INVALID_SOCKET) && !Create(connect_addr.family(), SOCK_STREAM)) {
248 return SOCKET_ERROR;
249 }
250 sockaddr_storage addr_storage;
251 size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
252 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
253 int err = ::connect(s_, addr, static_cast<int>(len));
254 UpdateLastError();
255 uint8_t events = DE_READ | DE_WRITE;
256 if (err == 0) {
257 state_ = CS_CONNECTED;
258 } else if (IsBlockingError(GetError())) {
259 state_ = CS_CONNECTING;
260 events |= DE_CONNECT;
261 } else {
262 return SOCKET_ERROR;
263 }
264
265 EnableEvents(events);
266 return 0;
267 }
268
GetError() const269 int PhysicalSocket::GetError() const {
270 CritScope cs(&crit_);
271 return error_;
272 }
273
SetError(int error)274 void PhysicalSocket::SetError(int error) {
275 CritScope cs(&crit_);
276 error_ = error;
277 }
278
GetState() const279 AsyncSocket::ConnState PhysicalSocket::GetState() const {
280 return state_;
281 }
282
GetOption(Option opt,int * value)283 int PhysicalSocket::GetOption(Option opt, int* value) {
284 int slevel;
285 int sopt;
286 if (TranslateOption(opt, &slevel, &sopt) == -1)
287 return -1;
288 socklen_t optlen = sizeof(*value);
289 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
290 if (ret == -1) {
291 return -1;
292 }
293 if (opt == OPT_DONTFRAGMENT) {
294 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
295 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
296 #endif
297 } else if (opt == OPT_DSCP) {
298 #if defined(WEBRTC_POSIX)
299 // unshift DSCP value to get six most significant bits of IP DiffServ field
300 *value >>= 2;
301 #endif
302 }
303 return ret;
304 }
305
SetOption(Option opt,int value)306 int PhysicalSocket::SetOption(Option opt, int value) {
307 int slevel;
308 int sopt;
309 if (TranslateOption(opt, &slevel, &sopt) == -1)
310 return -1;
311 if (opt == OPT_DONTFRAGMENT) {
312 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
313 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
314 #endif
315 } else if (opt == OPT_DSCP) {
316 #if defined(WEBRTC_POSIX)
317 // shift DSCP value to fit six most significant bits of IP DiffServ field
318 value <<= 2;
319 #endif
320 }
321 #if defined(WEBRTC_POSIX)
322 if (sopt == IPV6_TCLASS) {
323 // Set the IPv4 option in all cases to support dual-stack sockets.
324 ::setsockopt(s_, IPPROTO_IP, IP_TOS, (SockOptArg)&value, sizeof(value));
325 }
326 #endif
327 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
328 }
329
Send(const void * pv,size_t cb)330 int PhysicalSocket::Send(const void* pv, size_t cb) {
331 int sent = DoSend(
332 s_, reinterpret_cast<const char*>(pv), static_cast<int>(cb),
333 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
334 // Suppress SIGPIPE. Without this, attempting to send on a socket whose
335 // other end is closed will result in a SIGPIPE signal being raised to
336 // our process, which by default will terminate the process, which we
337 // don't want. By specifying this flag, we'll just get the error EPIPE
338 // instead and can handle the error gracefully.
339 MSG_NOSIGNAL
340 #else
341 0
342 #endif
343 );
344 UpdateLastError();
345 MaybeRemapSendError();
346 // We have seen minidumps where this may be false.
347 RTC_DCHECK(sent <= static_cast<int>(cb));
348 if ((sent > 0 && sent < static_cast<int>(cb)) ||
349 (sent < 0 && IsBlockingError(GetError()))) {
350 EnableEvents(DE_WRITE);
351 }
352 return sent;
353 }
354
SendTo(const void * buffer,size_t length,const SocketAddress & addr)355 int PhysicalSocket::SendTo(const void* buffer,
356 size_t length,
357 const SocketAddress& addr) {
358 sockaddr_storage saddr;
359 size_t len = addr.ToSockAddrStorage(&saddr);
360 int sent =
361 DoSendTo(s_, static_cast<const char*>(buffer), static_cast<int>(length),
362 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
363 // Suppress SIGPIPE. See above for explanation.
364 MSG_NOSIGNAL,
365 #else
366 0,
367 #endif
368 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
369 UpdateLastError();
370 MaybeRemapSendError();
371 // We have seen minidumps where this may be false.
372 RTC_DCHECK(sent <= static_cast<int>(length));
373 if ((sent > 0 && sent < static_cast<int>(length)) ||
374 (sent < 0 && IsBlockingError(GetError()))) {
375 EnableEvents(DE_WRITE);
376 }
377 return sent;
378 }
379
Recv(void * buffer,size_t length,int64_t * timestamp)380 int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) {
381 int received =
382 ::recv(s_, static_cast<char*>(buffer), static_cast<int>(length), 0);
383 if ((received == 0) && (length != 0)) {
384 // Note: on graceful shutdown, recv can return 0. In this case, we
385 // pretend it is blocking, and then signal close, so that simplifying
386 // assumptions can be made about Recv.
387 RTC_LOG(LS_WARNING) << "EOF from socket; deferring close event";
388 // Must turn this back on so that the select() loop will notice the close
389 // event.
390 EnableEvents(DE_READ);
391 SetError(EWOULDBLOCK);
392 return SOCKET_ERROR;
393 }
394 if (timestamp) {
395 *timestamp = GetSocketRecvTimestamp(s_);
396 }
397 UpdateLastError();
398 int error = GetError();
399 bool success = (received >= 0) || IsBlockingError(error);
400 if (udp_ || success) {
401 EnableEvents(DE_READ);
402 }
403 if (!success) {
404 RTC_LOG_F(LS_VERBOSE) << "Error = " << error;
405 }
406 return received;
407 }
408
RecvFrom(void * buffer,size_t length,SocketAddress * out_addr,int64_t * timestamp)409 int PhysicalSocket::RecvFrom(void* buffer,
410 size_t length,
411 SocketAddress* out_addr,
412 int64_t* timestamp) {
413 sockaddr_storage addr_storage;
414 socklen_t addr_len = sizeof(addr_storage);
415 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
416 int received = ::recvfrom(s_, static_cast<char*>(buffer),
417 static_cast<int>(length), 0, addr, &addr_len);
418 if (timestamp) {
419 *timestamp = GetSocketRecvTimestamp(s_);
420 }
421 UpdateLastError();
422 if ((received >= 0) && (out_addr != nullptr))
423 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
424 int error = GetError();
425 bool success = (received >= 0) || IsBlockingError(error);
426 if (udp_ || success) {
427 EnableEvents(DE_READ);
428 }
429 if (!success) {
430 RTC_LOG_F(LS_VERBOSE) << "Error = " << error;
431 }
432 return received;
433 }
434
Listen(int backlog)435 int PhysicalSocket::Listen(int backlog) {
436 int err = ::listen(s_, backlog);
437 UpdateLastError();
438 if (err == 0) {
439 state_ = CS_CONNECTING;
440 EnableEvents(DE_ACCEPT);
441 #if !defined(NDEBUG)
442 dbg_addr_ = "Listening @ ";
443 dbg_addr_.append(GetLocalAddress().ToString());
444 #endif
445 }
446 return err;
447 }
448
Accept(SocketAddress * out_addr)449 AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) {
450 // Always re-subscribe DE_ACCEPT to make sure new incoming connections will
451 // trigger an event even if DoAccept returns an error here.
452 EnableEvents(DE_ACCEPT);
453 sockaddr_storage addr_storage;
454 socklen_t addr_len = sizeof(addr_storage);
455 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
456 SOCKET s = DoAccept(s_, addr, &addr_len);
457 UpdateLastError();
458 if (s == INVALID_SOCKET)
459 return nullptr;
460 if (out_addr != nullptr)
461 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
462 return ss_->WrapSocket(s);
463 }
464
Close()465 int PhysicalSocket::Close() {
466 if (s_ == INVALID_SOCKET)
467 return 0;
468 int err = ::closesocket(s_);
469 UpdateLastError();
470 s_ = INVALID_SOCKET;
471 state_ = CS_CLOSED;
472 SetEnabledEvents(0);
473 if (resolver_) {
474 resolver_->Destroy(false);
475 resolver_ = nullptr;
476 }
477 return err;
478 }
479
DoAccept(SOCKET socket,sockaddr * addr,socklen_t * addrlen)480 SOCKET PhysicalSocket::DoAccept(SOCKET socket,
481 sockaddr* addr,
482 socklen_t* addrlen) {
483 return ::accept(socket, addr, addrlen);
484 }
485
DoSend(SOCKET socket,const char * buf,int len,int flags)486 int PhysicalSocket::DoSend(SOCKET socket, const char* buf, int len, int flags) {
487 return ::send(socket, buf, len, flags);
488 }
489
DoSendTo(SOCKET socket,const char * buf,int len,int flags,const struct sockaddr * dest_addr,socklen_t addrlen)490 int PhysicalSocket::DoSendTo(SOCKET socket,
491 const char* buf,
492 int len,
493 int flags,
494 const struct sockaddr* dest_addr,
495 socklen_t addrlen) {
496 return ::sendto(socket, buf, len, flags, dest_addr, addrlen);
497 }
498
OnResolveResult(AsyncResolverInterface * resolver)499 void PhysicalSocket::OnResolveResult(AsyncResolverInterface* resolver) {
500 if (resolver != resolver_) {
501 return;
502 }
503
504 int error = resolver_->GetError();
505 if (error == 0) {
506 error = DoConnect(resolver_->address());
507 } else {
508 Close();
509 }
510
511 if (error) {
512 SetError(error);
513 SignalCloseEvent(this, error);
514 }
515 }
516
UpdateLastError()517 void PhysicalSocket::UpdateLastError() {
518 SetError(LAST_SYSTEM_ERROR);
519 }
520
MaybeRemapSendError()521 void PhysicalSocket::MaybeRemapSendError() {
522 #if defined(WEBRTC_MAC)
523 // https://developer.apple.com/library/mac/documentation/Darwin/
524 // Reference/ManPages/man2/sendto.2.html
525 // ENOBUFS - The output queue for a network interface is full.
526 // This generally indicates that the interface has stopped sending,
527 // but may be caused by transient congestion.
528 if (GetError() == ENOBUFS) {
529 SetError(EWOULDBLOCK);
530 }
531 #endif
532 }
533
SetEnabledEvents(uint8_t events)534 void PhysicalSocket::SetEnabledEvents(uint8_t events) {
535 enabled_events_ = events;
536 }
537
EnableEvents(uint8_t events)538 void PhysicalSocket::EnableEvents(uint8_t events) {
539 enabled_events_ |= events;
540 }
541
DisableEvents(uint8_t events)542 void PhysicalSocket::DisableEvents(uint8_t events) {
543 enabled_events_ &= ~events;
544 }
545
TranslateOption(Option opt,int * slevel,int * sopt)546 int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) {
547 switch (opt) {
548 case OPT_DONTFRAGMENT:
549 #if defined(WEBRTC_WIN)
550 *slevel = IPPROTO_IP;
551 *sopt = IP_DONTFRAGMENT;
552 break;
553 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__)
554 RTC_LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
555 return -1;
556 #elif defined(WEBRTC_POSIX)
557 *slevel = IPPROTO_IP;
558 *sopt = IP_MTU_DISCOVER;
559 break;
560 #endif
561 case OPT_RCVBUF:
562 *slevel = SOL_SOCKET;
563 *sopt = SO_RCVBUF;
564 break;
565 case OPT_SNDBUF:
566 *slevel = SOL_SOCKET;
567 *sopt = SO_SNDBUF;
568 break;
569 case OPT_NODELAY:
570 *slevel = IPPROTO_TCP;
571 *sopt = TCP_NODELAY;
572 break;
573 case OPT_DSCP:
574 #if defined(WEBRTC_POSIX)
575 if (family_ == AF_INET6) {
576 *slevel = IPPROTO_IPV6;
577 *sopt = IPV6_TCLASS;
578 } else {
579 *slevel = IPPROTO_IP;
580 *sopt = IP_TOS;
581 }
582 break;
583 #else
584 RTC_LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
585 return -1;
586 #endif
587 case OPT_RTP_SENDTIME_EXTN_ID:
588 return -1; // No logging is necessary as this not a OS socket option.
589 default:
590 RTC_NOTREACHED();
591 return -1;
592 }
593 return 0;
594 }
595
SocketDispatcher(PhysicalSocketServer * ss)596 SocketDispatcher::SocketDispatcher(PhysicalSocketServer* ss)
597 #if defined(WEBRTC_WIN)
598 : PhysicalSocket(ss),
599 id_(0),
600 signal_close_(false)
601 #else
602 : PhysicalSocket(ss)
603 #endif
604 {
605 }
606
SocketDispatcher(SOCKET s,PhysicalSocketServer * ss)607 SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
608 #if defined(WEBRTC_WIN)
609 : PhysicalSocket(ss, s),
610 id_(0),
611 signal_close_(false)
612 #else
613 : PhysicalSocket(ss, s)
614 #endif
615 {
616 }
617
~SocketDispatcher()618 SocketDispatcher::~SocketDispatcher() {
619 Close();
620 }
621
Initialize()622 bool SocketDispatcher::Initialize() {
623 RTC_DCHECK(s_ != INVALID_SOCKET);
624 // Must be a non-blocking
625 #if defined(WEBRTC_WIN)
626 u_long argp = 1;
627 ioctlsocket(s_, FIONBIO, &argp);
628 #elif defined(WEBRTC_POSIX)
629 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
630 #endif
631 #if defined(WEBRTC_IOS)
632 // iOS may kill sockets when the app is moved to the background
633 // (specifically, if the app doesn't use the "voip" UIBackgroundMode). When
634 // we attempt to write to such a socket, SIGPIPE will be raised, which by
635 // default will terminate the process, which we don't want. By specifying
636 // this socket option, SIGPIPE will be disabled for the socket.
637 int value = 1;
638 ::setsockopt(s_, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value));
639 #endif
640 ss_->Add(this);
641 return true;
642 }
643
Create(int type)644 bool SocketDispatcher::Create(int type) {
645 return Create(AF_INET, type);
646 }
647
Create(int family,int type)648 bool SocketDispatcher::Create(int family, int type) {
649 // Change the socket to be non-blocking.
650 if (!PhysicalSocket::Create(family, type))
651 return false;
652
653 if (!Initialize())
654 return false;
655
656 #if defined(WEBRTC_WIN)
657 do {
658 id_ = ++next_id_;
659 } while (id_ == 0);
660 #endif
661 return true;
662 }
663
664 #if defined(WEBRTC_WIN)
665
GetWSAEvent()666 WSAEVENT SocketDispatcher::GetWSAEvent() {
667 return WSA_INVALID_EVENT;
668 }
669
GetSocket()670 SOCKET SocketDispatcher::GetSocket() {
671 return s_;
672 }
673
CheckSignalClose()674 bool SocketDispatcher::CheckSignalClose() {
675 if (!signal_close_)
676 return false;
677
678 char ch;
679 if (recv(s_, &ch, 1, MSG_PEEK) > 0)
680 return false;
681
682 state_ = CS_CLOSED;
683 signal_close_ = false;
684 SignalCloseEvent(this, signal_err_);
685 return true;
686 }
687
688 int SocketDispatcher::next_id_ = 0;
689
690 #elif defined(WEBRTC_POSIX)
691
GetDescriptor()692 int SocketDispatcher::GetDescriptor() {
693 return s_;
694 }
695
IsDescriptorClosed()696 bool SocketDispatcher::IsDescriptorClosed() {
697 if (udp_) {
698 // The MSG_PEEK trick doesn't work for UDP, since (at least in some
699 // circumstances) it requires reading an entire UDP packet, which would be
700 // bad for performance here. So, just check whether |s_| has been closed,
701 // which should be sufficient.
702 return s_ == INVALID_SOCKET;
703 }
704 // We don't have a reliable way of distinguishing end-of-stream
705 // from readability. So test on each readable call. Is this
706 // inefficient? Probably.
707 char ch;
708 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
709 if (res > 0) {
710 // Data available, so not closed.
711 return false;
712 } else if (res == 0) {
713 // EOF, so closed.
714 return true;
715 } else { // error
716 switch (errno) {
717 // Returned if we've already closed s_.
718 case EBADF:
719 // Returned during ungraceful peer shutdown.
720 case ECONNRESET:
721 return true;
722 // The normal blocking error; don't log anything.
723 case EWOULDBLOCK:
724 // Interrupted system call.
725 case EINTR:
726 return false;
727 default:
728 // Assume that all other errors are just blocking errors, meaning the
729 // connection is still good but we just can't read from it right now.
730 // This should only happen when connecting (and at most once), because
731 // in all other cases this function is only called if the file
732 // descriptor is already known to be in the readable state. However,
733 // it's not necessary a problem if we spuriously interpret a
734 // "connection lost"-type error as a blocking error, because typically
735 // the next recv() will get EOF, so we'll still eventually notice that
736 // the socket is closed.
737 RTC_LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
738 return false;
739 }
740 }
741 }
742
743 #endif // WEBRTC_POSIX
744
GetRequestedEvents()745 uint32_t SocketDispatcher::GetRequestedEvents() {
746 return enabled_events();
747 }
748
OnPreEvent(uint32_t ff)749 void SocketDispatcher::OnPreEvent(uint32_t ff) {
750 if ((ff & DE_CONNECT) != 0)
751 state_ = CS_CONNECTED;
752
753 #if defined(WEBRTC_WIN)
754 // We set CS_CLOSED from CheckSignalClose.
755 #elif defined(WEBRTC_POSIX)
756 if ((ff & DE_CLOSE) != 0)
757 state_ = CS_CLOSED;
758 #endif
759 }
760
761 #if defined(WEBRTC_WIN)
762
OnEvent(uint32_t ff,int err)763 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
764 int cache_id = id_;
765 // Make sure we deliver connect/accept first. Otherwise, consumers may see
766 // something like a READ followed by a CONNECT, which would be odd.
767 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
768 if (ff != DE_CONNECT)
769 RTC_LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
770 DisableEvents(DE_CONNECT);
771 #if !defined(NDEBUG)
772 dbg_addr_ = "Connected @ ";
773 dbg_addr_.append(GetRemoteAddress().ToString());
774 #endif
775 SignalConnectEvent(this);
776 }
777 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
778 DisableEvents(DE_ACCEPT);
779 SignalReadEvent(this);
780 }
781 if ((ff & DE_READ) != 0) {
782 DisableEvents(DE_READ);
783 SignalReadEvent(this);
784 }
785 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
786 DisableEvents(DE_WRITE);
787 SignalWriteEvent(this);
788 }
789 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
790 signal_close_ = true;
791 signal_err_ = err;
792 }
793 }
794
795 #elif defined(WEBRTC_POSIX)
796
OnEvent(uint32_t ff,int err)797 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
798 #if defined(WEBRTC_USE_EPOLL)
799 // Remember currently enabled events so we can combine multiple changes
800 // into one update call later.
801 // The signal handlers might re-enable events disabled here, so we can't
802 // keep a list of events to disable at the end of the method. This list
803 // would not be updated with the events enabled by the signal handlers.
804 StartBatchedEventUpdates();
805 #endif
806 // Make sure we deliver connect/accept first. Otherwise, consumers may see
807 // something like a READ followed by a CONNECT, which would be odd.
808 if ((ff & DE_CONNECT) != 0) {
809 DisableEvents(DE_CONNECT);
810 SignalConnectEvent(this);
811 }
812 if ((ff & DE_ACCEPT) != 0) {
813 DisableEvents(DE_ACCEPT);
814 SignalReadEvent(this);
815 }
816 if ((ff & DE_READ) != 0) {
817 DisableEvents(DE_READ);
818 SignalReadEvent(this);
819 }
820 if ((ff & DE_WRITE) != 0) {
821 DisableEvents(DE_WRITE);
822 SignalWriteEvent(this);
823 }
824 if ((ff & DE_CLOSE) != 0) {
825 // The socket is now dead to us, so stop checking it.
826 SetEnabledEvents(0);
827 SignalCloseEvent(this, err);
828 }
829 #if defined(WEBRTC_USE_EPOLL)
830 FinishBatchedEventUpdates();
831 #endif
832 }
833
834 #endif // WEBRTC_POSIX
835
836 #if defined(WEBRTC_USE_EPOLL)
837
GetEpollEvents(uint32_t ff)838 static int GetEpollEvents(uint32_t ff) {
839 int events = 0;
840 if (ff & (DE_READ | DE_ACCEPT)) {
841 events |= EPOLLIN;
842 }
843 if (ff & (DE_WRITE | DE_CONNECT)) {
844 events |= EPOLLOUT;
845 }
846 return events;
847 }
848
StartBatchedEventUpdates()849 void SocketDispatcher::StartBatchedEventUpdates() {
850 RTC_DCHECK_EQ(saved_enabled_events_, -1);
851 saved_enabled_events_ = enabled_events();
852 }
853
FinishBatchedEventUpdates()854 void SocketDispatcher::FinishBatchedEventUpdates() {
855 RTC_DCHECK_NE(saved_enabled_events_, -1);
856 uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_);
857 saved_enabled_events_ = -1;
858 MaybeUpdateDispatcher(old_events);
859 }
860
MaybeUpdateDispatcher(uint8_t old_events)861 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
862 if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) &&
863 saved_enabled_events_ == -1) {
864 ss_->Update(this);
865 }
866 }
867
SetEnabledEvents(uint8_t events)868 void SocketDispatcher::SetEnabledEvents(uint8_t events) {
869 uint8_t old_events = enabled_events();
870 PhysicalSocket::SetEnabledEvents(events);
871 MaybeUpdateDispatcher(old_events);
872 }
873
EnableEvents(uint8_t events)874 void SocketDispatcher::EnableEvents(uint8_t events) {
875 uint8_t old_events = enabled_events();
876 PhysicalSocket::EnableEvents(events);
877 MaybeUpdateDispatcher(old_events);
878 }
879
DisableEvents(uint8_t events)880 void SocketDispatcher::DisableEvents(uint8_t events) {
881 uint8_t old_events = enabled_events();
882 PhysicalSocket::DisableEvents(events);
883 MaybeUpdateDispatcher(old_events);
884 }
885
886 #endif // WEBRTC_USE_EPOLL
887
Close()888 int SocketDispatcher::Close() {
889 if (s_ == INVALID_SOCKET)
890 return 0;
891
892 #if defined(WEBRTC_WIN)
893 id_ = 0;
894 signal_close_ = false;
895 #endif
896 #if defined(WEBRTC_USE_EPOLL)
897 // If we're batching events, the socket can be closed and reopened
898 // during the batch. Set saved_enabled_events_ to 0 here so the new
899 // socket, if any, has the correct old events bitfield
900 if (saved_enabled_events_ != -1) {
901 saved_enabled_events_ = 0;
902 }
903 #endif
904 ss_->Remove(this);
905 return PhysicalSocket::Close();
906 }
907
908 #if defined(WEBRTC_POSIX)
909 class EventDispatcher : public Dispatcher {
910 public:
EventDispatcher(PhysicalSocketServer * ss)911 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
912 if (pipe(afd_) < 0)
913 RTC_LOG(LERROR) << "pipe failed";
914 ss_->Add(this);
915 }
916
~EventDispatcher()917 ~EventDispatcher() override {
918 ss_->Remove(this);
919 close(afd_[0]);
920 close(afd_[1]);
921 }
922
Signal()923 virtual void Signal() {
924 CritScope cs(&crit_);
925 if (!fSignaled_) {
926 const uint8_t b[1] = {0};
927 const ssize_t res = write(afd_[1], b, sizeof(b));
928 RTC_DCHECK_EQ(1, res);
929 fSignaled_ = true;
930 }
931 }
932
GetRequestedEvents()933 uint32_t GetRequestedEvents() override { return DE_READ; }
934
OnPreEvent(uint32_t ff)935 void OnPreEvent(uint32_t ff) override {
936 // It is not possible to perfectly emulate an auto-resetting event with
937 // pipes. This simulates it by resetting before the event is handled.
938
939 CritScope cs(&crit_);
940 if (fSignaled_) {
941 uint8_t b[4]; // Allow for reading more than 1 byte, but expect 1.
942 const ssize_t res = read(afd_[0], b, sizeof(b));
943 RTC_DCHECK_EQ(1, res);
944 fSignaled_ = false;
945 }
946 }
947
OnEvent(uint32_t ff,int err)948 void OnEvent(uint32_t ff, int err) override { RTC_NOTREACHED(); }
949
GetDescriptor()950 int GetDescriptor() override { return afd_[0]; }
951
IsDescriptorClosed()952 bool IsDescriptorClosed() override { return false; }
953
954 private:
955 PhysicalSocketServer* ss_;
956 int afd_[2];
957 bool fSignaled_;
958 RecursiveCriticalSection crit_;
959 };
960
961 #endif // WEBRTC_POSIX
962
963 #if defined(WEBRTC_WIN)
FlagsToEvents(uint32_t events)964 static uint32_t FlagsToEvents(uint32_t events) {
965 uint32_t ffFD = FD_CLOSE;
966 if (events & DE_READ)
967 ffFD |= FD_READ;
968 if (events & DE_WRITE)
969 ffFD |= FD_WRITE;
970 if (events & DE_CONNECT)
971 ffFD |= FD_CONNECT;
972 if (events & DE_ACCEPT)
973 ffFD |= FD_ACCEPT;
974 return ffFD;
975 }
976
977 class EventDispatcher : public Dispatcher {
978 public:
EventDispatcher(PhysicalSocketServer * ss)979 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss) {
980 hev_ = WSACreateEvent();
981 if (hev_) {
982 ss_->Add(this);
983 }
984 }
985
~EventDispatcher()986 ~EventDispatcher() override {
987 if (hev_ != nullptr) {
988 ss_->Remove(this);
989 WSACloseEvent(hev_);
990 hev_ = nullptr;
991 }
992 }
993
Signal()994 virtual void Signal() {
995 if (hev_ != nullptr)
996 WSASetEvent(hev_);
997 }
998
GetRequestedEvents()999 uint32_t GetRequestedEvents() override { return 0; }
1000
OnPreEvent(uint32_t ff)1001 void OnPreEvent(uint32_t ff) override { WSAResetEvent(hev_); }
1002
OnEvent(uint32_t ff,int err)1003 void OnEvent(uint32_t ff, int err) override {}
1004
GetWSAEvent()1005 WSAEVENT GetWSAEvent() override { return hev_; }
1006
GetSocket()1007 SOCKET GetSocket() override { return INVALID_SOCKET; }
1008
CheckSignalClose()1009 bool CheckSignalClose() override { return false; }
1010
1011 private:
1012 PhysicalSocketServer* ss_;
1013 WSAEVENT hev_;
1014 };
1015 #endif // WEBRTC_WIN
1016
1017 // Sets the value of a boolean value to false when signaled.
1018 class Signaler : public EventDispatcher {
1019 public:
Signaler(PhysicalSocketServer * ss,bool * pf)1020 Signaler(PhysicalSocketServer* ss, bool* pf) : EventDispatcher(ss), pf_(pf) {}
~Signaler()1021 ~Signaler() override {}
1022
OnEvent(uint32_t ff,int err)1023 void OnEvent(uint32_t ff, int err) override {
1024 if (pf_)
1025 *pf_ = false;
1026 }
1027
1028 private:
1029 bool* pf_;
1030 };
1031
PhysicalSocketServer()1032 PhysicalSocketServer::PhysicalSocketServer()
1033 :
1034 #if defined(WEBRTC_USE_EPOLL)
1035 // Since Linux 2.6.8, the size argument is ignored, but must be greater
1036 // than zero. Before that the size served as hint to the kernel for the
1037 // amount of space to initially allocate in internal data structures.
1038 epoll_fd_(epoll_create(FD_SETSIZE)),
1039 #endif
1040 #if defined(WEBRTC_WIN)
1041 socket_ev_(WSACreateEvent()),
1042 #endif
1043 fWait_(false) {
1044 #if defined(WEBRTC_USE_EPOLL)
1045 if (epoll_fd_ == -1) {
1046 // Not an error, will fall back to "select" below.
1047 RTC_LOG_E(LS_WARNING, EN, errno) << "epoll_create";
1048 // Note that -1 == INVALID_SOCKET, the alias used by later checks.
1049 }
1050 #endif
1051 signal_wakeup_ = new Signaler(this, &fWait_);
1052 }
1053
~PhysicalSocketServer()1054 PhysicalSocketServer::~PhysicalSocketServer() {
1055 #if defined(WEBRTC_WIN)
1056 WSACloseEvent(socket_ev_);
1057 #endif
1058 delete signal_wakeup_;
1059 #if defined(WEBRTC_USE_EPOLL)
1060 if (epoll_fd_ != INVALID_SOCKET) {
1061 close(epoll_fd_);
1062 }
1063 #endif
1064 RTC_DCHECK(dispatchers_.empty());
1065 }
1066
WakeUp()1067 void PhysicalSocketServer::WakeUp() {
1068 signal_wakeup_->Signal();
1069 }
1070
CreateSocket(int family,int type)1071 Socket* PhysicalSocketServer::CreateSocket(int family, int type) {
1072 PhysicalSocket* socket = new PhysicalSocket(this);
1073 if (socket->Create(family, type)) {
1074 return socket;
1075 } else {
1076 delete socket;
1077 return nullptr;
1078 }
1079 }
1080
CreateAsyncSocket(int family,int type)1081 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {
1082 SocketDispatcher* dispatcher = new SocketDispatcher(this);
1083 if (dispatcher->Create(family, type)) {
1084 return dispatcher;
1085 } else {
1086 delete dispatcher;
1087 return nullptr;
1088 }
1089 }
1090
WrapSocket(SOCKET s)1091 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
1092 SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
1093 if (dispatcher->Initialize()) {
1094 return dispatcher;
1095 } else {
1096 delete dispatcher;
1097 return nullptr;
1098 }
1099 }
1100
Add(Dispatcher * pdispatcher)1101 void PhysicalSocketServer::Add(Dispatcher* pdispatcher) {
1102 CritScope cs(&crit_);
1103 if (processing_dispatchers_) {
1104 // A dispatcher is being added while a "Wait" call is processing the
1105 // list of socket events.
1106 // Defer adding to "dispatchers_" set until processing is done to avoid
1107 // invalidating the iterator in "Wait".
1108 pending_remove_dispatchers_.erase(pdispatcher);
1109 pending_add_dispatchers_.insert(pdispatcher);
1110 } else {
1111 dispatchers_.insert(pdispatcher);
1112 }
1113 #if defined(WEBRTC_USE_EPOLL)
1114 if (epoll_fd_ != INVALID_SOCKET) {
1115 AddEpoll(pdispatcher);
1116 }
1117 #endif // WEBRTC_USE_EPOLL
1118 }
1119
Remove(Dispatcher * pdispatcher)1120 void PhysicalSocketServer::Remove(Dispatcher* pdispatcher) {
1121 CritScope cs(&crit_);
1122 if (processing_dispatchers_) {
1123 // A dispatcher is being removed while a "Wait" call is processing the
1124 // list of socket events.
1125 // Defer removal from "dispatchers_" set until processing is done to avoid
1126 // invalidating the iterator in "Wait".
1127 if (!pending_add_dispatchers_.erase(pdispatcher) &&
1128 dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1129 RTC_LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1130 "dispatcher, potentially from a duplicate call to "
1131 "Add.";
1132 return;
1133 }
1134
1135 pending_remove_dispatchers_.insert(pdispatcher);
1136 } else if (!dispatchers_.erase(pdispatcher)) {
1137 RTC_LOG(LS_WARNING)
1138 << "PhysicalSocketServer asked to remove a unknown "
1139 "dispatcher, potentially from a duplicate call to Add.";
1140 return;
1141 }
1142 #if defined(WEBRTC_USE_EPOLL)
1143 if (epoll_fd_ != INVALID_SOCKET) {
1144 RemoveEpoll(pdispatcher);
1145 }
1146 #endif // WEBRTC_USE_EPOLL
1147 }
1148
Update(Dispatcher * pdispatcher)1149 void PhysicalSocketServer::Update(Dispatcher* pdispatcher) {
1150 #if defined(WEBRTC_USE_EPOLL)
1151 if (epoll_fd_ == INVALID_SOCKET) {
1152 return;
1153 }
1154
1155 CritScope cs(&crit_);
1156 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1157 return;
1158 }
1159
1160 UpdateEpoll(pdispatcher);
1161 #endif
1162 }
1163
AddRemovePendingDispatchers()1164 void PhysicalSocketServer::AddRemovePendingDispatchers() {
1165 if (!pending_add_dispatchers_.empty()) {
1166 for (Dispatcher* pdispatcher : pending_add_dispatchers_) {
1167 dispatchers_.insert(pdispatcher);
1168 }
1169 pending_add_dispatchers_.clear();
1170 }
1171
1172 if (!pending_remove_dispatchers_.empty()) {
1173 for (Dispatcher* pdispatcher : pending_remove_dispatchers_) {
1174 dispatchers_.erase(pdispatcher);
1175 }
1176 pending_remove_dispatchers_.clear();
1177 }
1178 }
1179
1180 #if defined(WEBRTC_POSIX)
1181
Wait(int cmsWait,bool process_io)1182 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1183 #if defined(WEBRTC_USE_EPOLL)
1184 // We don't keep a dedicated "epoll" descriptor containing only the non-IO
1185 // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
1186 // "select" to support sockets larger than FD_SETSIZE.
1187 if (!process_io) {
1188 return WaitPoll(cmsWait, signal_wakeup_);
1189 } else if (epoll_fd_ != INVALID_SOCKET) {
1190 return WaitEpoll(cmsWait);
1191 }
1192 #endif
1193 return WaitSelect(cmsWait, process_io);
1194 }
1195
ProcessEvents(Dispatcher * dispatcher,bool readable,bool writable,bool check_error)1196 static void ProcessEvents(Dispatcher* dispatcher,
1197 bool readable,
1198 bool writable,
1199 bool check_error) {
1200 int errcode = 0;
1201 // TODO(pthatcher): Should we set errcode if getsockopt fails?
1202 if (check_error) {
1203 socklen_t len = sizeof(errcode);
1204 ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode,
1205 &len);
1206 }
1207
1208 uint32_t ff = 0;
1209
1210 // Check readable descriptors. If we're waiting on an accept, signal
1211 // that. Otherwise we're waiting for data, check to see if we're
1212 // readable or really closed.
1213 // TODO(pthatcher): Only peek at TCP descriptors.
1214 if (readable) {
1215 if (dispatcher->GetRequestedEvents() & DE_ACCEPT) {
1216 ff |= DE_ACCEPT;
1217 } else if (errcode || dispatcher->IsDescriptorClosed()) {
1218 ff |= DE_CLOSE;
1219 } else {
1220 ff |= DE_READ;
1221 }
1222 }
1223
1224 // Check writable descriptors. If we're waiting on a connect, detect
1225 // success versus failure by the reaped error code.
1226 if (writable) {
1227 if (dispatcher->GetRequestedEvents() & DE_CONNECT) {
1228 if (!errcode) {
1229 ff |= DE_CONNECT;
1230 } else {
1231 ff |= DE_CLOSE;
1232 }
1233 } else {
1234 ff |= DE_WRITE;
1235 }
1236 }
1237
1238 // Tell the descriptor about the event.
1239 if (ff != 0) {
1240 dispatcher->OnPreEvent(ff);
1241 dispatcher->OnEvent(ff, errcode);
1242 }
1243 }
1244
WaitSelect(int cmsWait,bool process_io)1245 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
1246 // Calculate timing information
1247
1248 struct timeval* ptvWait = nullptr;
1249 struct timeval tvWait;
1250 int64_t stop_us;
1251 if (cmsWait != kForever) {
1252 // Calculate wait timeval
1253 tvWait.tv_sec = cmsWait / 1000;
1254 tvWait.tv_usec = (cmsWait % 1000) * 1000;
1255 ptvWait = &tvWait;
1256
1257 // Calculate when to return
1258 stop_us = rtc::TimeMicros() + cmsWait * 1000;
1259 }
1260
1261 // Zero all fd_sets. Don't need to do this inside the loop since
1262 // select() zeros the descriptors not signaled
1263
1264 fd_set fdsRead;
1265 FD_ZERO(&fdsRead);
1266 fd_set fdsWrite;
1267 FD_ZERO(&fdsWrite);
1268 // Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the
1269 // inline assembly in FD_ZERO.
1270 // http://crbug.com/344505
1271 #ifdef MEMORY_SANITIZER
1272 __msan_unpoison(&fdsRead, sizeof(fdsRead));
1273 __msan_unpoison(&fdsWrite, sizeof(fdsWrite));
1274 #endif
1275
1276 fWait_ = true;
1277
1278 while (fWait_) {
1279 int fdmax = -1;
1280 {
1281 CritScope cr(&crit_);
1282 // TODO(jbauch): Support re-entrant waiting.
1283 RTC_DCHECK(!processing_dispatchers_);
1284 for (Dispatcher* pdispatcher : dispatchers_) {
1285 // Query dispatchers for read and write wait state
1286 RTC_DCHECK(pdispatcher);
1287 if (!process_io && (pdispatcher != signal_wakeup_))
1288 continue;
1289 int fd = pdispatcher->GetDescriptor();
1290 // "select"ing a file descriptor that is equal to or larger than
1291 // FD_SETSIZE will result in undefined behavior.
1292 RTC_DCHECK_LT(fd, FD_SETSIZE);
1293 if (fd > fdmax)
1294 fdmax = fd;
1295
1296 uint32_t ff = pdispatcher->GetRequestedEvents();
1297 if (ff & (DE_READ | DE_ACCEPT))
1298 FD_SET(fd, &fdsRead);
1299 if (ff & (DE_WRITE | DE_CONNECT))
1300 FD_SET(fd, &fdsWrite);
1301 }
1302 }
1303
1304 // Wait then call handlers as appropriate
1305 // < 0 means error
1306 // 0 means timeout
1307 // > 0 means count of descriptors ready
1308 int n = select(fdmax + 1, &fdsRead, &fdsWrite, nullptr, ptvWait);
1309
1310 // If error, return error.
1311 if (n < 0) {
1312 if (errno != EINTR) {
1313 RTC_LOG_E(LS_ERROR, EN, errno) << "select";
1314 return false;
1315 }
1316 // Else ignore the error and keep going. If this EINTR was for one of the
1317 // signals managed by this PhysicalSocketServer, the
1318 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1319 // iteration.
1320 } else if (n == 0) {
1321 // If timeout, return success
1322 return true;
1323 } else {
1324 // We have signaled descriptors
1325 CritScope cr(&crit_);
1326 processing_dispatchers_ = true;
1327 for (Dispatcher* pdispatcher : dispatchers_) {
1328 int fd = pdispatcher->GetDescriptor();
1329
1330 bool readable = FD_ISSET(fd, &fdsRead);
1331 if (readable) {
1332 FD_CLR(fd, &fdsRead);
1333 }
1334
1335 bool writable = FD_ISSET(fd, &fdsWrite);
1336 if (writable) {
1337 FD_CLR(fd, &fdsWrite);
1338 }
1339
1340 // The error code can be signaled through reads or writes.
1341 ProcessEvents(pdispatcher, readable, writable, readable || writable);
1342 }
1343
1344 processing_dispatchers_ = false;
1345 // Process deferred dispatchers that have been added/removed while the
1346 // events were handled above.
1347 AddRemovePendingDispatchers();
1348 }
1349
1350 // Recalc the time remaining to wait. Doing it here means it doesn't get
1351 // calced twice the first time through the loop
1352 if (ptvWait) {
1353 ptvWait->tv_sec = 0;
1354 ptvWait->tv_usec = 0;
1355 int64_t time_left_us = stop_us - rtc::TimeMicros();
1356 if (time_left_us > 0) {
1357 ptvWait->tv_sec = time_left_us / rtc::kNumMicrosecsPerSec;
1358 ptvWait->tv_usec = time_left_us % rtc::kNumMicrosecsPerSec;
1359 }
1360 }
1361 }
1362
1363 return true;
1364 }
1365
1366 #if defined(WEBRTC_USE_EPOLL)
1367
AddEpoll(Dispatcher * pdispatcher)1368 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
1369 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1370 int fd = pdispatcher->GetDescriptor();
1371 RTC_DCHECK(fd != INVALID_SOCKET);
1372 if (fd == INVALID_SOCKET) {
1373 return;
1374 }
1375
1376 struct epoll_event event = {0};
1377 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1378 event.data.ptr = pdispatcher;
1379 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
1380 RTC_DCHECK_EQ(err, 0);
1381 if (err == -1) {
1382 RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
1383 }
1384 }
1385
RemoveEpoll(Dispatcher * pdispatcher)1386 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
1387 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1388 int fd = pdispatcher->GetDescriptor();
1389 RTC_DCHECK(fd != INVALID_SOCKET);
1390 if (fd == INVALID_SOCKET) {
1391 return;
1392 }
1393
1394 struct epoll_event event = {0};
1395 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
1396 RTC_DCHECK(err == 0 || errno == ENOENT);
1397 if (err == -1) {
1398 if (errno == ENOENT) {
1399 // Socket has already been closed.
1400 RTC_LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1401 } else {
1402 RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1403 }
1404 }
1405 }
1406
UpdateEpoll(Dispatcher * pdispatcher)1407 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
1408 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1409 int fd = pdispatcher->GetDescriptor();
1410 RTC_DCHECK(fd != INVALID_SOCKET);
1411 if (fd == INVALID_SOCKET) {
1412 return;
1413 }
1414
1415 struct epoll_event event = {0};
1416 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1417 event.data.ptr = pdispatcher;
1418 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
1419 RTC_DCHECK_EQ(err, 0);
1420 if (err == -1) {
1421 RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
1422 }
1423 }
1424
WaitEpoll(int cmsWait)1425 bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
1426 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1427 int64_t tvWait = -1;
1428 int64_t tvStop = -1;
1429 if (cmsWait != kForever) {
1430 tvWait = cmsWait;
1431 tvStop = TimeAfter(cmsWait);
1432 }
1433
1434 fWait_ = true;
1435 while (fWait_) {
1436 // Wait then call handlers as appropriate
1437 // < 0 means error
1438 // 0 means timeout
1439 // > 0 means count of descriptors ready
1440 int n = epoll_wait(epoll_fd_, epoll_events_.data(), epoll_events_.size(),
1441 static_cast<int>(tvWait));
1442 if (n < 0) {
1443 if (errno != EINTR) {
1444 RTC_LOG_E(LS_ERROR, EN, errno) << "epoll";
1445 return false;
1446 }
1447 // Else ignore the error and keep going. If this EINTR was for one of the
1448 // signals managed by this PhysicalSocketServer, the
1449 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1450 // iteration.
1451 } else if (n == 0) {
1452 // If timeout, return success
1453 return true;
1454 } else {
1455 // We have signaled descriptors
1456 CritScope cr(&crit_);
1457 for (int i = 0; i < n; ++i) {
1458 const epoll_event& event = epoll_events_[i];
1459 Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr);
1460 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1461 // The dispatcher for this socket no longer exists.
1462 continue;
1463 }
1464
1465 bool readable = (event.events & (EPOLLIN | EPOLLPRI));
1466 bool writable = (event.events & EPOLLOUT);
1467 bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP));
1468
1469 ProcessEvents(pdispatcher, readable, writable, check_error);
1470 }
1471 }
1472
1473 if (cmsWait != kForever) {
1474 tvWait = TimeDiff(tvStop, TimeMillis());
1475 if (tvWait < 0) {
1476 // Return success on timeout.
1477 return true;
1478 }
1479 }
1480 }
1481
1482 return true;
1483 }
1484
WaitPoll(int cmsWait,Dispatcher * dispatcher)1485 bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
1486 RTC_DCHECK(dispatcher);
1487 int64_t tvWait = -1;
1488 int64_t tvStop = -1;
1489 if (cmsWait != kForever) {
1490 tvWait = cmsWait;
1491 tvStop = TimeAfter(cmsWait);
1492 }
1493
1494 fWait_ = true;
1495
1496 struct pollfd fds = {0};
1497 int fd = dispatcher->GetDescriptor();
1498 fds.fd = fd;
1499
1500 while (fWait_) {
1501 uint32_t ff = dispatcher->GetRequestedEvents();
1502 fds.events = 0;
1503 if (ff & (DE_READ | DE_ACCEPT)) {
1504 fds.events |= POLLIN;
1505 }
1506 if (ff & (DE_WRITE | DE_CONNECT)) {
1507 fds.events |= POLLOUT;
1508 }
1509 fds.revents = 0;
1510
1511 // Wait then call handlers as appropriate
1512 // < 0 means error
1513 // 0 means timeout
1514 // > 0 means count of descriptors ready
1515 int n = poll(&fds, 1, static_cast<int>(tvWait));
1516 if (n < 0) {
1517 if (errno != EINTR) {
1518 RTC_LOG_E(LS_ERROR, EN, errno) << "poll";
1519 return false;
1520 }
1521 // Else ignore the error and keep going. If this EINTR was for one of the
1522 // signals managed by this PhysicalSocketServer, the
1523 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1524 // iteration.
1525 } else if (n == 0) {
1526 // If timeout, return success
1527 return true;
1528 } else {
1529 // We have signaled descriptors (should only be the passed dispatcher).
1530 RTC_DCHECK_EQ(n, 1);
1531 RTC_DCHECK_EQ(fds.fd, fd);
1532
1533 bool readable = (fds.revents & (POLLIN | POLLPRI));
1534 bool writable = (fds.revents & POLLOUT);
1535 bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
1536
1537 ProcessEvents(dispatcher, readable, writable, check_error);
1538 }
1539
1540 if (cmsWait != kForever) {
1541 tvWait = TimeDiff(tvStop, TimeMillis());
1542 if (tvWait < 0) {
1543 // Return success on timeout.
1544 return true;
1545 }
1546 }
1547 }
1548
1549 return true;
1550 }
1551
1552 #endif // WEBRTC_USE_EPOLL
1553
1554 #endif // WEBRTC_POSIX
1555
1556 #if defined(WEBRTC_WIN)
Wait(int cmsWait,bool process_io)1557 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1558 int64_t cmsTotal = cmsWait;
1559 int64_t cmsElapsed = 0;
1560 int64_t msStart = Time();
1561
1562 fWait_ = true;
1563 while (fWait_) {
1564 std::vector<WSAEVENT> events;
1565 std::vector<Dispatcher*> event_owners;
1566
1567 events.push_back(socket_ev_);
1568
1569 {
1570 CritScope cr(&crit_);
1571 // TODO(jbauch): Support re-entrant waiting.
1572 RTC_DCHECK(!processing_dispatchers_);
1573
1574 // Calling "CheckSignalClose" might remove a closed dispatcher from the
1575 // set. This must be deferred to prevent invalidating the iterator.
1576 processing_dispatchers_ = true;
1577 for (Dispatcher* disp : dispatchers_) {
1578 if (!process_io && (disp != signal_wakeup_))
1579 continue;
1580 SOCKET s = disp->GetSocket();
1581 if (disp->CheckSignalClose()) {
1582 // We just signalled close, don't poll this socket
1583 } else if (s != INVALID_SOCKET) {
1584 WSAEventSelect(s, events[0],
1585 FlagsToEvents(disp->GetRequestedEvents()));
1586 } else {
1587 events.push_back(disp->GetWSAEvent());
1588 event_owners.push_back(disp);
1589 }
1590 }
1591
1592 processing_dispatchers_ = false;
1593 // Process deferred dispatchers that have been added/removed while the
1594 // events were handled above.
1595 AddRemovePendingDispatchers();
1596 }
1597
1598 // Which is shorter, the delay wait or the asked wait?
1599
1600 int64_t cmsNext;
1601 if (cmsWait == kForever) {
1602 cmsNext = cmsWait;
1603 } else {
1604 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
1605 }
1606
1607 // Wait for one of the events to signal
1608 DWORD dw =
1609 WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), &events[0],
1610 false, static_cast<DWORD>(cmsNext), false);
1611
1612 if (dw == WSA_WAIT_FAILED) {
1613 // Failed?
1614 // TODO(pthatcher): need a better strategy than this!
1615 WSAGetLastError();
1616 RTC_NOTREACHED();
1617 return false;
1618 } else if (dw == WSA_WAIT_TIMEOUT) {
1619 // Timeout?
1620 return true;
1621 } else {
1622 // Figure out which one it is and call it
1623 CritScope cr(&crit_);
1624 int index = dw - WSA_WAIT_EVENT_0;
1625 if (index > 0) {
1626 --index; // The first event is the socket event
1627 Dispatcher* disp = event_owners[index];
1628 // The dispatcher could have been removed while waiting for events.
1629 if (dispatchers_.find(disp) != dispatchers_.end()) {
1630 disp->OnPreEvent(0);
1631 disp->OnEvent(0, 0);
1632 }
1633 } else if (process_io) {
1634 processing_dispatchers_ = true;
1635 for (Dispatcher* disp : dispatchers_) {
1636 SOCKET s = disp->GetSocket();
1637 if (s == INVALID_SOCKET)
1638 continue;
1639
1640 WSANETWORKEVENTS wsaEvents;
1641 int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
1642 if (err == 0) {
1643 {
1644 if ((wsaEvents.lNetworkEvents & FD_READ) &&
1645 wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
1646 RTC_LOG(WARNING)
1647 << "PhysicalSocketServer got FD_READ_BIT error "
1648 << wsaEvents.iErrorCode[FD_READ_BIT];
1649 }
1650 if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
1651 wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
1652 RTC_LOG(WARNING)
1653 << "PhysicalSocketServer got FD_WRITE_BIT error "
1654 << wsaEvents.iErrorCode[FD_WRITE_BIT];
1655 }
1656 if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
1657 wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
1658 RTC_LOG(WARNING)
1659 << "PhysicalSocketServer got FD_CONNECT_BIT error "
1660 << wsaEvents.iErrorCode[FD_CONNECT_BIT];
1661 }
1662 if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
1663 wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
1664 RTC_LOG(WARNING)
1665 << "PhysicalSocketServer got FD_ACCEPT_BIT error "
1666 << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
1667 }
1668 if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
1669 wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
1670 RTC_LOG(WARNING)
1671 << "PhysicalSocketServer got FD_CLOSE_BIT error "
1672 << wsaEvents.iErrorCode[FD_CLOSE_BIT];
1673 }
1674 }
1675 uint32_t ff = 0;
1676 int errcode = 0;
1677 if (wsaEvents.lNetworkEvents & FD_READ)
1678 ff |= DE_READ;
1679 if (wsaEvents.lNetworkEvents & FD_WRITE)
1680 ff |= DE_WRITE;
1681 if (wsaEvents.lNetworkEvents & FD_CONNECT) {
1682 if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
1683 ff |= DE_CONNECT;
1684 } else {
1685 ff |= DE_CLOSE;
1686 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
1687 }
1688 }
1689 if (wsaEvents.lNetworkEvents & FD_ACCEPT)
1690 ff |= DE_ACCEPT;
1691 if (wsaEvents.lNetworkEvents & FD_CLOSE) {
1692 ff |= DE_CLOSE;
1693 errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
1694 }
1695 if (ff != 0) {
1696 disp->OnPreEvent(ff);
1697 disp->OnEvent(ff, errcode);
1698 }
1699 }
1700 }
1701
1702 processing_dispatchers_ = false;
1703 // Process deferred dispatchers that have been added/removed while the
1704 // events were handled above.
1705 AddRemovePendingDispatchers();
1706 }
1707
1708 // Reset the network event until new activity occurs
1709 WSAResetEvent(socket_ev_);
1710 }
1711
1712 // Break?
1713 if (!fWait_)
1714 break;
1715 cmsElapsed = TimeSince(msStart);
1716 if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
1717 break;
1718 }
1719 }
1720
1721 // Done
1722 return true;
1723 }
1724 #endif // WEBRTC_WIN
1725
1726 } // namespace rtc
1727