1 /*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #if defined(_MSC_VER) && _MSC_VER < 1300
29 #pragma warning(disable:4786)
30 #endif
31
32 #include <cassert>
33
34 #ifdef POSIX
35 #include <string.h>
36 #include <errno.h>
37 #include <fcntl.h>
38 #include <sys/time.h>
39 #include <unistd.h>
40 #include <signal.h>
41 #endif
42
43 #ifdef WIN32
44 #define WIN32_LEAN_AND_MEAN
45 #include <windows.h>
46 #include <winsock2.h>
47 #include <ws2tcpip.h>
48 #undef SetPort
49 #endif
50
51 #include <algorithm>
52 #include <map>
53
54 #include "talk/base/basictypes.h"
55 #include "talk/base/byteorder.h"
56 #include "talk/base/common.h"
57 #include "talk/base/logging.h"
58 #include "talk/base/nethelpers.h"
59 #include "talk/base/physicalsocketserver.h"
60 #include "talk/base/time.h"
61 #include "talk/base/winping.h"
62 #include "talk/base/win32socketinit.h"
63
64 // stm: this will tell us if we are on OSX
65 #ifdef HAVE_CONFIG_H
66 #include "config.h"
67 #endif
68
69 #ifdef POSIX
70 #include <netinet/tcp.h> // for TCP_NODELAY
71 #define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h
72 typedef void* SockOptArg;
73 #endif // POSIX
74
75 #ifdef WIN32
76 typedef char* SockOptArg;
77 #endif
78
79 namespace talk_base {
80
81 // Standard MTUs, from RFC 1191
82 const uint16 PACKET_MAXIMUMS[] = {
83 65535, // Theoretical maximum, Hyperchannel
84 32000, // Nothing
85 17914, // 16Mb IBM Token Ring
86 8166, // IEEE 802.4
87 //4464, // IEEE 802.5 (4Mb max)
88 4352, // FDDI
89 //2048, // Wideband Network
90 2002, // IEEE 802.5 (4Mb recommended)
91 //1536, // Expermental Ethernet Networks
92 //1500, // Ethernet, Point-to-Point (default)
93 1492, // IEEE 802.3
94 1006, // SLIP, ARPANET
95 //576, // X.25 Networks
96 //544, // DEC IP Portal
97 //512, // NETBIOS
98 508, // IEEE 802/Source-Rt Bridge, ARCNET
99 296, // Point-to-Point (low delay)
100 68, // Official minimum
101 0, // End of list marker
102 };
103
104 const uint32 IP_HEADER_SIZE = 20;
105 const uint32 ICMP_HEADER_SIZE = 8;
106
107 class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
108 public:
PhysicalSocket(PhysicalSocketServer * ss,SOCKET s=INVALID_SOCKET)109 PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET)
110 : ss_(ss), s_(s), enabled_events_(0), error_(0),
111 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
112 resolver_(NULL) {
113 #ifdef WIN32
114 // EnsureWinsockInit() ensures that winsock is initialized. The default
115 // version of this function doesn't do anything because winsock is
116 // initialized by constructor of a static object. If neccessary libjingle
117 // users can link it with a different version of this function by replacing
118 // win32socketinit.cc. See win32socketinit.cc for more details.
119 EnsureWinsockInit();
120 #endif
121 if (s_ != INVALID_SOCKET) {
122 enabled_events_ = DE_READ | DE_WRITE;
123
124 int type = SOCK_STREAM;
125 socklen_t len = sizeof(type);
126 VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len));
127 udp_ = (SOCK_DGRAM == type);
128 }
129 }
130
~PhysicalSocket()131 virtual ~PhysicalSocket() {
132 Close();
133 }
134
135 // Creates the underlying OS socket (same as the "socket" function).
Create(int type)136 virtual bool Create(int type) {
137 Close();
138 s_ = ::socket(AF_INET, type, 0);
139 udp_ = (SOCK_DGRAM == type);
140 UpdateLastError();
141 if (udp_)
142 enabled_events_ = DE_READ | DE_WRITE;
143 return s_ != INVALID_SOCKET;
144 }
145
GetLocalAddress() const146 SocketAddress GetLocalAddress() const {
147 sockaddr_in addr;
148 socklen_t addrlen = sizeof(addr);
149 int result = ::getsockname(s_, (sockaddr*)&addr, &addrlen);
150 SocketAddress address;
151 if (result >= 0) {
152 ASSERT(addrlen == sizeof(addr));
153 address.FromSockAddr(addr);
154 } else {
155 LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
156 << s_;
157 }
158 return address;
159 }
160
GetRemoteAddress() const161 SocketAddress GetRemoteAddress() const {
162 sockaddr_in addr;
163 socklen_t addrlen = sizeof(addr);
164 int result = ::getpeername(s_, (sockaddr*)&addr, &addrlen);
165 SocketAddress address;
166 if (result >= 0) {
167 ASSERT(addrlen == sizeof(addr));
168 address.FromSockAddr(addr);
169 } else {
170 LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket="
171 << s_;
172 }
173 return address;
174 }
175
Bind(const SocketAddress & addr)176 int Bind(const SocketAddress& addr) {
177 sockaddr_in saddr;
178 addr.ToSockAddr(&saddr);
179 int err = ::bind(s_, (sockaddr*)&saddr, sizeof(saddr));
180 UpdateLastError();
181 #ifdef _DEBUG
182 if (0 == err) {
183 dbg_addr_ = "Bound @ ";
184 dbg_addr_.append(GetLocalAddress().ToString());
185 }
186 #endif // _DEBUG
187 return err;
188 }
189
Connect(const SocketAddress & addr)190 int Connect(const SocketAddress& addr) {
191 // TODO: Implicit creation is required to reconnect...
192 // ...but should we make it more explicit?
193 if ((s_ == INVALID_SOCKET) && !Create(SOCK_STREAM))
194 return SOCKET_ERROR;
195 if (addr.IsUnresolved()) {
196 if (state_ != CS_CLOSED) {
197 SetError(EALREADY);
198 return SOCKET_ERROR;
199 }
200
201 LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
202 resolver_ = new AsyncResolver();
203 resolver_->set_address(addr);
204 resolver_->SignalWorkDone.connect(this, &PhysicalSocket::OnResolveResult);
205 resolver_->Start();
206 state_ = CS_CONNECTING;
207 return 0;
208 }
209
210 return DoConnect(addr);
211 }
212
DoConnect(const SocketAddress & addr)213 int DoConnect(const SocketAddress& addr) {
214 sockaddr_in saddr;
215 addr.ToSockAddr(&saddr);
216 int err = ::connect(s_, (sockaddr*)&saddr, sizeof(saddr));
217 UpdateLastError();
218 if (err == 0) {
219 state_ = CS_CONNECTED;
220 } else if (IsBlockingError(error_)) {
221 state_ = CS_CONNECTING;
222 enabled_events_ |= DE_CONNECT;
223 } else {
224 return SOCKET_ERROR;
225 }
226
227 enabled_events_ |= DE_READ | DE_WRITE;
228 return 0;
229 }
230
GetError() const231 int GetError() const {
232 return error_;
233 }
234
SetError(int error)235 void SetError(int error) {
236 error_ = error;
237 }
238
GetState() const239 ConnState GetState() const {
240 return state_;
241 }
242
GetOption(Option opt,int * value)243 int GetOption(Option opt, int* value) {
244 int slevel;
245 int sopt;
246 if (TranslateOption(opt, &slevel, &sopt) == -1)
247 return -1;
248 socklen_t optlen = sizeof(*value);
249 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
250 if (ret != -1 && opt == OPT_DONTFRAGMENT) {
251 #ifdef LINUX
252 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
253 #endif
254 }
255 return ret;
256 }
257
SetOption(Option opt,int value)258 int SetOption(Option opt, int value) {
259 int slevel;
260 int sopt;
261 if (TranslateOption(opt, &slevel, &sopt) == -1)
262 return -1;
263 if (opt == OPT_DONTFRAGMENT) {
264 #ifdef LINUX
265 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
266 #endif
267 }
268 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
269 }
270
Send(const void * pv,size_t cb)271 int Send(const void *pv, size_t cb) {
272 int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb,
273 #ifdef LINUX
274 // Suppress SIGPIPE. Without this, attempting to send on a socket whose
275 // other end is closed will result in a SIGPIPE signal being raised to
276 // our process, which by default will terminate the process, which we
277 // don't want. By specifying this flag, we'll just get the error EPIPE
278 // instead and can handle the error gracefully.
279 MSG_NOSIGNAL
280 #else
281 0
282 #endif
283 );
284 UpdateLastError();
285 // We have seen minidumps where this may be false.
286 ASSERT(sent <= static_cast<int>(cb));
287 if ((sent < 0) && IsBlockingError(error_)) {
288 enabled_events_ |= DE_WRITE;
289 }
290 return sent;
291 }
292
SendTo(const void * pv,size_t cb,const SocketAddress & addr)293 int SendTo(const void *pv, size_t cb, const SocketAddress& addr) {
294 sockaddr_in saddr;
295 addr.ToSockAddr(&saddr);
296 int sent = ::sendto(
297 s_, (const char *)pv, (int)cb,
298 #ifdef LINUX
299 // Suppress SIGPIPE. See above for explanation.
300 MSG_NOSIGNAL,
301 #else
302 0,
303 #endif
304 (sockaddr*)&saddr, sizeof(saddr));
305 UpdateLastError();
306 // We have seen minidumps where this may be false.
307 ASSERT(sent <= static_cast<int>(cb));
308 if ((sent < 0) && IsBlockingError(error_)) {
309 enabled_events_ |= DE_WRITE;
310 }
311 return sent;
312 }
313
Recv(void * pv,size_t cb)314 int Recv(void *pv, size_t cb) {
315 int received = ::recv(s_, (char *)pv, (int)cb, 0);
316 if ((received == 0) && (cb != 0)) {
317 // Note: on graceful shutdown, recv can return 0. In this case, we
318 // pretend it is blocking, and then signal close, so that simplifying
319 // assumptions can be made about Recv.
320 LOG(LS_WARNING) << "EOF from socket; deferring close event";
321 // Must turn this back on so that the select() loop will notice the close
322 // event.
323 enabled_events_ |= DE_READ;
324 error_ = EWOULDBLOCK;
325 return SOCKET_ERROR;
326 }
327 UpdateLastError();
328 bool success = (received >= 0) || IsBlockingError(error_);
329 if (udp_ || success) {
330 enabled_events_ |= DE_READ;
331 }
332 if (!success) {
333 LOG_F(LS_VERBOSE) << "Error = " << error_;
334 }
335 return received;
336 }
337
RecvFrom(void * pv,size_t cb,SocketAddress * paddr)338 int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) {
339 sockaddr_in saddr;
340 socklen_t cbAddr = sizeof(saddr);
341 int received = ::recvfrom(s_, (char *)pv, (int)cb, 0, (sockaddr*)&saddr,
342 &cbAddr);
343 UpdateLastError();
344 if ((received >= 0) && (paddr != NULL))
345 paddr->FromSockAddr(saddr);
346 bool success = (received >= 0) || IsBlockingError(error_);
347 if (udp_ || success) {
348 enabled_events_ |= DE_READ;
349 }
350 if (!success) {
351 LOG_F(LS_VERBOSE) << "Error = " << error_;
352 }
353 return received;
354 }
355
Listen(int backlog)356 int Listen(int backlog) {
357 int err = ::listen(s_, backlog);
358 UpdateLastError();
359 if (err == 0) {
360 state_ = CS_CONNECTING;
361 enabled_events_ |= DE_ACCEPT;
362 #ifdef _DEBUG
363 dbg_addr_ = "Listening @ ";
364 dbg_addr_.append(GetLocalAddress().ToString());
365 #endif // _DEBUG
366 }
367 return err;
368 }
369
Accept(SocketAddress * paddr)370 AsyncSocket* Accept(SocketAddress *paddr) {
371 sockaddr_in saddr;
372 socklen_t cbAddr = sizeof(saddr);
373 SOCKET s = ::accept(s_, (sockaddr*)&saddr, &cbAddr);
374 UpdateLastError();
375 if (s == INVALID_SOCKET)
376 return NULL;
377 enabled_events_ |= DE_ACCEPT;
378 if (paddr != NULL)
379 paddr->FromSockAddr(saddr);
380 return ss_->WrapSocket(s);
381 }
382
Close()383 int Close() {
384 if (s_ == INVALID_SOCKET)
385 return 0;
386 int err = ::closesocket(s_);
387 UpdateLastError();
388 s_ = INVALID_SOCKET;
389 state_ = CS_CLOSED;
390 enabled_events_ = 0;
391 if (resolver_) {
392 resolver_->Destroy(false);
393 resolver_ = NULL;
394 }
395 return err;
396 }
397
EstimateMTU(uint16 * mtu)398 int EstimateMTU(uint16* mtu) {
399 SocketAddress addr = GetRemoteAddress();
400 if (addr.IsAny()) {
401 error_ = ENOTCONN;
402 return -1;
403 }
404
405 #if defined(WIN32)
406 // Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
407 WinPing ping;
408 if (!ping.IsValid()) {
409 error_ = EINVAL; // can't think of a better error ID
410 return -1;
411 }
412
413 for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) {
414 int32 size = PACKET_MAXIMUMS[level] - IP_HEADER_SIZE - ICMP_HEADER_SIZE;
415 WinPing::PingResult result = ping.Ping(addr.ip(), size, 0, 1, false);
416 if (result == WinPing::PING_FAIL) {
417 error_ = EINVAL; // can't think of a better error ID
418 return -1;
419 } else if (result != WinPing::PING_TOO_LARGE) {
420 *mtu = PACKET_MAXIMUMS[level];
421 return 0;
422 }
423 }
424
425 ASSERT(false);
426 return -1;
427 #elif defined(IOS) || defined(OSX)
428 // No simple way to do this on Mac OS X.
429 // SIOCGIFMTU would work if we knew which interface would be used, but
430 // figuring that out is pretty complicated. For now we'll return an error
431 // and let the caller pick a default MTU.
432 error_ = EINVAL;
433 return -1;
434 #elif defined(LINUX) || defined(ANDROID)
435 // Gets the path MTU.
436 int value;
437 socklen_t vlen = sizeof(value);
438 int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen);
439 if (err < 0) {
440 UpdateLastError();
441 return err;
442 }
443
444 ASSERT((0 <= value) && (value <= 65536));
445 *mtu = value;
446 return 0;
447 #endif
448 }
449
socketserver()450 SocketServer* socketserver() { return ss_; }
451
452 protected:
OnResolveResult(SignalThread * thread)453 void OnResolveResult(SignalThread* thread) {
454 if (thread != resolver_) {
455 return;
456 }
457
458 int error = resolver_->error();
459 if (error == 0) {
460 error = DoConnect(resolver_->address());
461 } else {
462 Close();
463 }
464
465 if (error) {
466 error_ = error;
467 SignalCloseEvent(this, error_);
468 }
469 }
470
UpdateLastError()471 void UpdateLastError() {
472 error_ = LAST_SYSTEM_ERROR;
473 }
474
TranslateOption(Option opt,int * slevel,int * sopt)475 static int TranslateOption(Option opt, int* slevel, int* sopt) {
476 switch (opt) {
477 case OPT_DONTFRAGMENT:
478 #ifdef WIN32
479 *slevel = IPPROTO_IP;
480 *sopt = IP_DONTFRAGMENT;
481 break;
482 #elif defined(IOS) || defined(OSX) || defined(BSD)
483 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
484 return -1;
485 #elif defined(POSIX)
486 *slevel = IPPROTO_IP;
487 *sopt = IP_MTU_DISCOVER;
488 break;
489 #endif
490 case OPT_RCVBUF:
491 *slevel = SOL_SOCKET;
492 *sopt = SO_RCVBUF;
493 break;
494 case OPT_SNDBUF:
495 *slevel = SOL_SOCKET;
496 *sopt = SO_SNDBUF;
497 break;
498 case OPT_NODELAY:
499 *slevel = IPPROTO_TCP;
500 *sopt = TCP_NODELAY;
501 break;
502 default:
503 ASSERT(false);
504 return -1;
505 }
506 return 0;
507 }
508
509 PhysicalSocketServer* ss_;
510 SOCKET s_;
511 uint8 enabled_events_;
512 bool udp_;
513 int error_;
514 ConnState state_;
515 AsyncResolver* resolver_;
516
517 #ifdef _DEBUG
518 std::string dbg_addr_;
519 #endif // _DEBUG;
520 };
521
522 #ifdef POSIX
523 class EventDispatcher : public Dispatcher {
524 public:
EventDispatcher(PhysicalSocketServer * ss)525 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
526 if (pipe(afd_) < 0)
527 LOG(LERROR) << "pipe failed";
528 ss_->Add(this);
529 }
530
~EventDispatcher()531 virtual ~EventDispatcher() {
532 ss_->Remove(this);
533 close(afd_[0]);
534 close(afd_[1]);
535 }
536
Signal()537 virtual void Signal() {
538 CritScope cs(&crit_);
539 if (!fSignaled_) {
540 const uint8 b[1] = { 0 };
541 if (VERIFY(1 == write(afd_[1], b, sizeof(b)))) {
542 fSignaled_ = true;
543 }
544 }
545 }
546
GetRequestedEvents()547 virtual uint32 GetRequestedEvents() {
548 return DE_READ;
549 }
550
OnPreEvent(uint32 ff)551 virtual void OnPreEvent(uint32 ff) {
552 // It is not possible to perfectly emulate an auto-resetting event with
553 // pipes. This simulates it by resetting before the event is handled.
554
555 CritScope cs(&crit_);
556 if (fSignaled_) {
557 uint8 b[4]; // Allow for reading more than 1 byte, but expect 1.
558 VERIFY(1 == read(afd_[0], b, sizeof(b)));
559 fSignaled_ = false;
560 }
561 }
562
OnEvent(uint32 ff,int err)563 virtual void OnEvent(uint32 ff, int err) {
564 ASSERT(false);
565 }
566
GetDescriptor()567 virtual int GetDescriptor() {
568 return afd_[0];
569 }
570
IsDescriptorClosed()571 virtual bool IsDescriptorClosed() {
572 return false;
573 }
574
575 private:
576 PhysicalSocketServer *ss_;
577 int afd_[2];
578 bool fSignaled_;
579 CriticalSection crit_;
580 };
581
582 // These two classes use the self-pipe trick to deliver POSIX signals to our
583 // select loop. This is the only safe, reliable, cross-platform way to do
584 // non-trivial things with a POSIX signal in an event-driven program (until
585 // proper pselect() implementations become ubiquitous).
586
587 class PosixSignalHandler {
588 public:
589 // POSIX only specifies 32 signals, but in principle the system might have
590 // more and the programmer might choose to use them, so we size our array
591 // for 128.
592 static const int kNumPosixSignals = 128;
593
Instance()594 static PosixSignalHandler *Instance() { return &instance_; }
595
596 // Returns true if the given signal number is set.
IsSignalSet(int signum) const597 bool IsSignalSet(int signum) const {
598 ASSERT(signum < ARRAY_SIZE(received_signal_));
599 if (signum < ARRAY_SIZE(received_signal_)) {
600 return received_signal_[signum];
601 } else {
602 return false;
603 }
604 }
605
606 // Clears the given signal number.
ClearSignal(int signum)607 void ClearSignal(int signum) {
608 ASSERT(signum < ARRAY_SIZE(received_signal_));
609 if (signum < ARRAY_SIZE(received_signal_)) {
610 received_signal_[signum] = false;
611 }
612 }
613
614 // Returns the file descriptor to monitor for signal events.
GetDescriptor() const615 int GetDescriptor() const {
616 return afd_[0];
617 }
618
619 // This is called directly from our real signal handler, so it must be
620 // signal-handler-safe. That means it cannot assume anything about the
621 // user-level state of the process, since the handler could be executed at any
622 // time on any thread.
OnPosixSignalReceived(int signum)623 void OnPosixSignalReceived(int signum) {
624 if (signum >= ARRAY_SIZE(received_signal_)) {
625 // We don't have space in our array for this.
626 return;
627 }
628 // Set a flag saying we've seen this signal.
629 received_signal_[signum] = true;
630 // Notify application code that we got a signal.
631 const uint8 b[1] = { 0 };
632 if (-1 == write(afd_[1], b, sizeof(b))) {
633 // Nothing we can do here. If there's an error somehow then there's
634 // nothing we can safely do from a signal handler.
635 // No, we can't even safely log it.
636 // But, we still have to check the return value here. Otherwise,
637 // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help.
638 return;
639 }
640 }
641
642 private:
PosixSignalHandler()643 PosixSignalHandler() {
644 if (pipe(afd_) < 0) {
645 LOG_ERR(LS_ERROR) << "pipe failed";
646 return;
647 }
648 if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) {
649 LOG_ERR(LS_WARNING) << "fcntl #1 failed";
650 }
651 if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) {
652 LOG_ERR(LS_WARNING) << "fcntl #2 failed";
653 }
654 memset(const_cast<void *>(static_cast<volatile void *>(received_signal_)),
655 0,
656 sizeof(received_signal_));
657 }
658
~PosixSignalHandler()659 ~PosixSignalHandler() {
660 int fd1 = afd_[0];
661 int fd2 = afd_[1];
662 // We clobber the stored file descriptor numbers here or else in principle
663 // a signal that happens to be delivered during application termination
664 // could erroneously write a zero byte to an unrelated file handle in
665 // OnPosixSignalReceived() if some other file happens to be opened later
666 // during shutdown and happens to be given the same file descriptor number
667 // as our pipe had. Unfortunately even with this precaution there is still a
668 // race where that could occur if said signal happens to be handled
669 // concurrently with this code and happens to have already read the value of
670 // afd_[1] from memory before we clobber it, but that's unlikely.
671 afd_[0] = -1;
672 afd_[1] = -1;
673 close(fd1);
674 close(fd2);
675 }
676
677 // There is just a single global instance. (Signal handlers do not get any
678 // sort of user-defined void * parameter, so they can't access anything that
679 // isn't global.)
680 static PosixSignalHandler instance_;
681
682 int afd_[2];
683 // These are boolean flags that will be set in our signal handler and read
684 // and cleared from Wait(). There is a race involved in this, but it is
685 // benign. The signal handler sets the flag before signaling the pipe, so
686 // we'll never end up blocking in select() while a flag is still true.
687 // However, if two of the same signal arrive close to each other then it's
688 // possible that the second time the handler may set the flag while it's still
689 // true, meaning that signal will be missed. But the first occurrence of it
690 // will still be handled, so this isn't a problem.
691 // Volatile is not necessary here for correctness, but this data _is_ volatile
692 // so I've marked it as such.
693 volatile uint8 received_signal_[kNumPosixSignals];
694 };
695
696 PosixSignalHandler PosixSignalHandler::instance_;
697
698 class PosixSignalDispatcher : public Dispatcher {
699 public:
PosixSignalDispatcher(PhysicalSocketServer * owner)700 PosixSignalDispatcher(PhysicalSocketServer *owner) : owner_(owner) {
701 owner_->Add(this);
702 }
703
~PosixSignalDispatcher()704 virtual ~PosixSignalDispatcher() {
705 owner_->Remove(this);
706 }
707
GetRequestedEvents()708 virtual uint32 GetRequestedEvents() {
709 return DE_READ;
710 }
711
OnPreEvent(uint32 ff)712 virtual void OnPreEvent(uint32 ff) {
713 // Events might get grouped if signals come very fast, so we read out up to
714 // 16 bytes to make sure we keep the pipe empty.
715 uint8 b[16];
716 ssize_t ret = read(GetDescriptor(), b, sizeof(b));
717 if (ret < 0) {
718 LOG_ERR(LS_WARNING) << "Error in read()";
719 } else if (ret == 0) {
720 LOG(LS_WARNING) << "Should have read at least one byte";
721 }
722 }
723
OnEvent(uint32 ff,int err)724 virtual void OnEvent(uint32 ff, int err) {
725 for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals;
726 ++signum) {
727 if (PosixSignalHandler::Instance()->IsSignalSet(signum)) {
728 PosixSignalHandler::Instance()->ClearSignal(signum);
729 HandlerMap::iterator i = handlers_.find(signum);
730 if (i == handlers_.end()) {
731 // This can happen if a signal is delivered to our process at around
732 // the same time as we unset our handler for it. It is not an error
733 // condition, but it's unusual enough to be worth logging.
734 LOG(LS_INFO) << "Received signal with no handler: " << signum;
735 } else {
736 // Otherwise, execute our handler.
737 (*i->second)(signum);
738 }
739 }
740 }
741 }
742
GetDescriptor()743 virtual int GetDescriptor() {
744 return PosixSignalHandler::Instance()->GetDescriptor();
745 }
746
IsDescriptorClosed()747 virtual bool IsDescriptorClosed() {
748 return false;
749 }
750
SetHandler(int signum,void (* handler)(int))751 void SetHandler(int signum, void (*handler)(int)) {
752 handlers_[signum] = handler;
753 }
754
ClearHandler(int signum)755 void ClearHandler(int signum) {
756 handlers_.erase(signum);
757 }
758
HasHandlers()759 bool HasHandlers() {
760 return !handlers_.empty();
761 }
762
763 private:
764 typedef std::map<int, void (*)(int)> HandlerMap;
765
766 HandlerMap handlers_;
767 // Our owner.
768 PhysicalSocketServer *owner_;
769 };
770
771 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
772 public:
SocketDispatcher(PhysicalSocketServer * ss)773 explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) {
774 }
SocketDispatcher(SOCKET s,PhysicalSocketServer * ss)775 SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) {
776 }
777
~SocketDispatcher()778 virtual ~SocketDispatcher() {
779 Close();
780 }
781
Initialize()782 bool Initialize() {
783 ss_->Add(this);
784 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
785 return true;
786 }
787
Create(int type)788 virtual bool Create(int type) {
789 // Change the socket to be non-blocking.
790 if (!PhysicalSocket::Create(type))
791 return false;
792
793 return Initialize();
794 }
795
GetDescriptor()796 virtual int GetDescriptor() {
797 return s_;
798 }
799
IsDescriptorClosed()800 virtual bool IsDescriptorClosed() {
801 // We don't have a reliable way of distinguishing end-of-stream
802 // from readability. So test on each readable call. Is this
803 // inefficient? Probably.
804 char ch;
805 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
806 if (res > 0) {
807 // Data available, so not closed.
808 return false;
809 } else if (res == 0) {
810 // EOF, so closed.
811 return true;
812 } else { // error
813 switch (errno) {
814 // Returned if we've already closed s_.
815 case EBADF:
816 // Returned during ungraceful peer shutdown.
817 case ECONNRESET:
818 return true;
819 default:
820 // Assume that all other errors are just blocking errors, meaning the
821 // connection is still good but we just can't read from it right now.
822 // This should only happen when connecting (and at most once), because
823 // in all other cases this function is only called if the file
824 // descriptor is already known to be in the readable state. However,
825 // it's not necessary a problem if we spuriously interpret a
826 // "connection lost"-type error as a blocking error, because typically
827 // the next recv() will get EOF, so we'll still eventually notice that
828 // the socket is closed.
829 LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
830 return false;
831 }
832 }
833 }
834
GetRequestedEvents()835 virtual uint32 GetRequestedEvents() {
836 return enabled_events_;
837 }
838
OnPreEvent(uint32 ff)839 virtual void OnPreEvent(uint32 ff) {
840 if ((ff & DE_CONNECT) != 0)
841 state_ = CS_CONNECTED;
842 if ((ff & DE_CLOSE) != 0)
843 state_ = CS_CLOSED;
844 }
845
OnEvent(uint32 ff,int err)846 virtual void OnEvent(uint32 ff, int err) {
847 if ((ff & DE_READ) != 0) {
848 enabled_events_ &= ~DE_READ;
849 SignalReadEvent(this);
850 }
851 if ((ff & DE_WRITE) != 0) {
852 enabled_events_ &= ~DE_WRITE;
853 SignalWriteEvent(this);
854 }
855 if ((ff & DE_CONNECT) != 0) {
856 enabled_events_ &= ~DE_CONNECT;
857 SignalConnectEvent(this);
858 }
859 if ((ff & DE_ACCEPT) != 0) {
860 enabled_events_ &= ~DE_ACCEPT;
861 SignalReadEvent(this);
862 }
863 if ((ff & DE_CLOSE) != 0) {
864 // The socket is now dead to us, so stop checking it.
865 enabled_events_ = 0;
866 SignalCloseEvent(this, err);
867 }
868 }
869
Close()870 virtual int Close() {
871 if (s_ == INVALID_SOCKET)
872 return 0;
873
874 ss_->Remove(this);
875 return PhysicalSocket::Close();
876 }
877 };
878
879 class FileDispatcher: public Dispatcher, public AsyncFile {
880 public:
FileDispatcher(int fd,PhysicalSocketServer * ss)881 FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) {
882 set_readable(true);
883
884 ss_->Add(this);
885
886 fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK);
887 }
888
~FileDispatcher()889 virtual ~FileDispatcher() {
890 ss_->Remove(this);
891 }
892
socketserver()893 SocketServer* socketserver() { return ss_; }
894
GetDescriptor()895 virtual int GetDescriptor() {
896 return fd_;
897 }
898
IsDescriptorClosed()899 virtual bool IsDescriptorClosed() {
900 return false;
901 }
902
GetRequestedEvents()903 virtual uint32 GetRequestedEvents() {
904 return flags_;
905 }
906
OnPreEvent(uint32 ff)907 virtual void OnPreEvent(uint32 ff) {
908 }
909
OnEvent(uint32 ff,int err)910 virtual void OnEvent(uint32 ff, int err) {
911 if ((ff & DE_READ) != 0)
912 SignalReadEvent(this);
913 if ((ff & DE_WRITE) != 0)
914 SignalWriteEvent(this);
915 if ((ff & DE_CLOSE) != 0)
916 SignalCloseEvent(this, err);
917 }
918
readable()919 virtual bool readable() {
920 return (flags_ & DE_READ) != 0;
921 }
922
set_readable(bool value)923 virtual void set_readable(bool value) {
924 flags_ = value ? (flags_ | DE_READ) : (flags_ & ~DE_READ);
925 }
926
writable()927 virtual bool writable() {
928 return (flags_ & DE_WRITE) != 0;
929 }
930
set_writable(bool value)931 virtual void set_writable(bool value) {
932 flags_ = value ? (flags_ | DE_WRITE) : (flags_ & ~DE_WRITE);
933 }
934
935 private:
936 PhysicalSocketServer* ss_;
937 int fd_;
938 int flags_;
939 };
940
CreateFile(int fd)941 AsyncFile* PhysicalSocketServer::CreateFile(int fd) {
942 return new FileDispatcher(fd, this);
943 }
944
945 #endif // POSIX
946
947 #ifdef WIN32
FlagsToEvents(uint32 events)948 static uint32 FlagsToEvents(uint32 events) {
949 uint32 ffFD = FD_CLOSE;
950 if (events & DE_READ)
951 ffFD |= FD_READ;
952 if (events & DE_WRITE)
953 ffFD |= FD_WRITE;
954 if (events & DE_CONNECT)
955 ffFD |= FD_CONNECT;
956 if (events & DE_ACCEPT)
957 ffFD |= FD_ACCEPT;
958 return ffFD;
959 }
960
961 class EventDispatcher : public Dispatcher {
962 public:
EventDispatcher(PhysicalSocketServer * ss)963 EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) {
964 hev_ = WSACreateEvent();
965 if (hev_) {
966 ss_->Add(this);
967 }
968 }
969
~EventDispatcher()970 ~EventDispatcher() {
971 if (hev_ != NULL) {
972 ss_->Remove(this);
973 WSACloseEvent(hev_);
974 hev_ = NULL;
975 }
976 }
977
Signal()978 virtual void Signal() {
979 if (hev_ != NULL)
980 WSASetEvent(hev_);
981 }
982
GetRequestedEvents()983 virtual uint32 GetRequestedEvents() {
984 return 0;
985 }
986
OnPreEvent(uint32 ff)987 virtual void OnPreEvent(uint32 ff) {
988 WSAResetEvent(hev_);
989 }
990
OnEvent(uint32 ff,int err)991 virtual void OnEvent(uint32 ff, int err) {
992 }
993
GetWSAEvent()994 virtual WSAEVENT GetWSAEvent() {
995 return hev_;
996 }
997
GetSocket()998 virtual SOCKET GetSocket() {
999 return INVALID_SOCKET;
1000 }
1001
CheckSignalClose()1002 virtual bool CheckSignalClose() { return false; }
1003
1004 private:
1005 PhysicalSocketServer* ss_;
1006 WSAEVENT hev_;
1007 };
1008
1009 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
1010 public:
1011 static int next_id_;
1012 int id_;
1013 bool signal_close_;
1014 int signal_err_;
1015
SocketDispatcher(PhysicalSocketServer * ss)1016 SocketDispatcher(PhysicalSocketServer* ss)
1017 : PhysicalSocket(ss),
1018 id_(0),
1019 signal_close_(false) {
1020 }
1021
SocketDispatcher(SOCKET s,PhysicalSocketServer * ss)1022 SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
1023 : PhysicalSocket(ss, s),
1024 id_(0),
1025 signal_close_(false) {
1026 }
1027
~SocketDispatcher()1028 virtual ~SocketDispatcher() {
1029 Close();
1030 }
1031
Initialize()1032 bool Initialize() {
1033 ASSERT(s_ != INVALID_SOCKET);
1034 // Must be a non-blocking
1035 u_long argp = 1;
1036 ioctlsocket(s_, FIONBIO, &argp);
1037 ss_->Add(this);
1038 return true;
1039 }
1040
Create(int type)1041 virtual bool Create(int type) {
1042 // Create socket
1043 if (!PhysicalSocket::Create(type))
1044 return false;
1045
1046 if (!Initialize())
1047 return false;
1048
1049 do { id_ = ++next_id_; } while (id_ == 0);
1050 return true;
1051 }
1052
Close()1053 virtual int Close() {
1054 if (s_ == INVALID_SOCKET)
1055 return 0;
1056
1057 id_ = 0;
1058 signal_close_ = false;
1059 ss_->Remove(this);
1060 return PhysicalSocket::Close();
1061 }
1062
GetRequestedEvents()1063 virtual uint32 GetRequestedEvents() {
1064 return enabled_events_;
1065 }
1066
OnPreEvent(uint32 ff)1067 virtual void OnPreEvent(uint32 ff) {
1068 if ((ff & DE_CONNECT) != 0)
1069 state_ = CS_CONNECTED;
1070 // We set CS_CLOSED from CheckSignalClose.
1071 }
1072
OnEvent(uint32 ff,int err)1073 virtual void OnEvent(uint32 ff, int err) {
1074 int cache_id = id_;
1075 if ((ff & DE_READ) != 0) {
1076 enabled_events_ &= ~DE_READ;
1077 SignalReadEvent(this);
1078 }
1079 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
1080 enabled_events_ &= ~DE_WRITE;
1081 SignalWriteEvent(this);
1082 }
1083 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
1084 if (ff != DE_CONNECT)
1085 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
1086 enabled_events_ &= ~DE_CONNECT;
1087 #ifdef _DEBUG
1088 dbg_addr_ = "Connected @ ";
1089 dbg_addr_.append(GetRemoteAddress().ToString());
1090 #endif // _DEBUG
1091 SignalConnectEvent(this);
1092 }
1093 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
1094 enabled_events_ &= ~DE_ACCEPT;
1095 SignalReadEvent(this);
1096 }
1097 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
1098 signal_close_ = true;
1099 signal_err_ = err;
1100 }
1101 }
1102
GetWSAEvent()1103 virtual WSAEVENT GetWSAEvent() {
1104 return WSA_INVALID_EVENT;
1105 }
1106
GetSocket()1107 virtual SOCKET GetSocket() {
1108 return s_;
1109 }
1110
CheckSignalClose()1111 virtual bool CheckSignalClose() {
1112 if (!signal_close_)
1113 return false;
1114
1115 char ch;
1116 if (recv(s_, &ch, 1, MSG_PEEK) > 0)
1117 return false;
1118
1119 state_ = CS_CLOSED;
1120 signal_close_ = false;
1121 SignalCloseEvent(this, signal_err_);
1122 return true;
1123 }
1124 };
1125
1126 int SocketDispatcher::next_id_ = 0;
1127
1128 #endif // WIN32
1129
1130 // Sets the value of a boolean value to false when signaled.
1131 class Signaler : public EventDispatcher {
1132 public:
Signaler(PhysicalSocketServer * ss,bool * pf)1133 Signaler(PhysicalSocketServer* ss, bool* pf)
1134 : EventDispatcher(ss), pf_(pf) {
1135 }
~Signaler()1136 virtual ~Signaler() { }
1137
OnEvent(uint32 ff,int err)1138 void OnEvent(uint32 ff, int err) {
1139 if (pf_)
1140 *pf_ = false;
1141 }
1142
1143 private:
1144 bool *pf_;
1145 };
1146
PhysicalSocketServer()1147 PhysicalSocketServer::PhysicalSocketServer()
1148 : fWait_(false),
1149 last_tick_tracked_(0),
1150 last_tick_dispatch_count_(0) {
1151 signal_wakeup_ = new Signaler(this, &fWait_);
1152 #ifdef WIN32
1153 socket_ev_ = WSACreateEvent();
1154 #endif
1155 }
1156
~PhysicalSocketServer()1157 PhysicalSocketServer::~PhysicalSocketServer() {
1158 #ifdef WIN32
1159 WSACloseEvent(socket_ev_);
1160 #endif
1161 #ifdef POSIX
1162 signal_dispatcher_.reset();
1163 #endif
1164 delete signal_wakeup_;
1165 ASSERT(dispatchers_.empty());
1166 }
1167
WakeUp()1168 void PhysicalSocketServer::WakeUp() {
1169 signal_wakeup_->Signal();
1170 }
1171
CreateSocket(int type)1172 Socket* PhysicalSocketServer::CreateSocket(int type) {
1173 PhysicalSocket* socket = new PhysicalSocket(this);
1174 if (socket->Create(type)) {
1175 return socket;
1176 } else {
1177 delete socket;
1178 return 0;
1179 }
1180 }
1181
CreateAsyncSocket(int type)1182 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) {
1183 SocketDispatcher* dispatcher = new SocketDispatcher(this);
1184 if (dispatcher->Create(type)) {
1185 return dispatcher;
1186 } else {
1187 delete dispatcher;
1188 return 0;
1189 }
1190 }
1191
WrapSocket(SOCKET s)1192 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
1193 SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
1194 if (dispatcher->Initialize()) {
1195 return dispatcher;
1196 } else {
1197 delete dispatcher;
1198 return 0;
1199 }
1200 }
1201
Add(Dispatcher * pdispatcher)1202 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1203 CritScope cs(&crit_);
1204 // Prevent duplicates. This can cause dead dispatchers to stick around.
1205 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1206 dispatchers_.end(),
1207 pdispatcher);
1208 if (pos != dispatchers_.end())
1209 return;
1210 dispatchers_.push_back(pdispatcher);
1211 }
1212
Remove(Dispatcher * pdispatcher)1213 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
1214 CritScope cs(&crit_);
1215 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1216 dispatchers_.end(),
1217 pdispatcher);
1218 ASSERT(pos != dispatchers_.end());
1219 size_t index = pos - dispatchers_.begin();
1220 dispatchers_.erase(pos);
1221 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
1222 ++it) {
1223 if (index < **it) {
1224 --**it;
1225 }
1226 }
1227 }
1228
1229 #ifdef POSIX
Wait(int cmsWait,bool process_io)1230 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1231 // Calculate timing information
1232
1233 struct timeval *ptvWait = NULL;
1234 struct timeval tvWait;
1235 struct timeval tvStop;
1236 if (cmsWait != kForever) {
1237 // Calculate wait timeval
1238 tvWait.tv_sec = cmsWait / 1000;
1239 tvWait.tv_usec = (cmsWait % 1000) * 1000;
1240 ptvWait = &tvWait;
1241
1242 // Calculate when to return in a timeval
1243 gettimeofday(&tvStop, NULL);
1244 tvStop.tv_sec += tvWait.tv_sec;
1245 tvStop.tv_usec += tvWait.tv_usec;
1246 if (tvStop.tv_usec >= 1000000) {
1247 tvStop.tv_usec -= 1000000;
1248 tvStop.tv_sec += 1;
1249 }
1250 }
1251
1252 // Zero all fd_sets. Don't need to do this inside the loop since
1253 // select() zeros the descriptors not signaled
1254
1255 fd_set fdsRead;
1256 FD_ZERO(&fdsRead);
1257 fd_set fdsWrite;
1258 FD_ZERO(&fdsWrite);
1259
1260 fWait_ = true;
1261
1262 while (fWait_) {
1263 int fdmax = -1;
1264 {
1265 CritScope cr(&crit_);
1266 for (size_t i = 0; i < dispatchers_.size(); ++i) {
1267 // Query dispatchers for read and write wait state
1268 Dispatcher *pdispatcher = dispatchers_[i];
1269 ASSERT(pdispatcher);
1270 if (!process_io && (pdispatcher != signal_wakeup_))
1271 continue;
1272 int fd = pdispatcher->GetDescriptor();
1273 if (fd > fdmax)
1274 fdmax = fd;
1275
1276 uint32 ff = pdispatcher->GetRequestedEvents();
1277 if (ff & (DE_READ | DE_ACCEPT))
1278 FD_SET(fd, &fdsRead);
1279 if (ff & (DE_WRITE | DE_CONNECT))
1280 FD_SET(fd, &fdsWrite);
1281 }
1282 }
1283
1284 // Wait then call handlers as appropriate
1285 // < 0 means error
1286 // 0 means timeout
1287 // > 0 means count of descriptors ready
1288 int n = select(fdmax + 1, &fdsRead, &fdsWrite, NULL, ptvWait);
1289
1290 // If error, return error.
1291 if (n < 0) {
1292 if (errno != EINTR) {
1293 LOG_E(LS_ERROR, EN, errno) << "select";
1294 return false;
1295 }
1296 // Else ignore the error and keep going. If this EINTR was for one of the
1297 // signals managed by this PhysicalSocketServer, the
1298 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1299 // iteration.
1300 } else if (n == 0) {
1301 // If timeout, return success
1302 return true;
1303 } else {
1304 // We have signaled descriptors
1305 CritScope cr(&crit_);
1306 for (size_t i = 0; i < dispatchers_.size(); ++i) {
1307 Dispatcher *pdispatcher = dispatchers_[i];
1308 int fd = pdispatcher->GetDescriptor();
1309 uint32 ff = 0;
1310 int errcode = 0;
1311
1312 // Reap any error code, which can be signaled through reads or writes.
1313 // TODO: Should we set errcode if getsockopt fails?
1314 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
1315 socklen_t len = sizeof(errcode);
1316 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1317 }
1318
1319 // Check readable descriptors. If we're waiting on an accept, signal
1320 // that. Otherwise we're waiting for data, check to see if we're
1321 // readable or really closed.
1322 // TODO: Only peek at TCP descriptors.
1323 if (FD_ISSET(fd, &fdsRead)) {
1324 FD_CLR(fd, &fdsRead);
1325 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
1326 ff |= DE_ACCEPT;
1327 } else if (errcode || pdispatcher->IsDescriptorClosed()) {
1328 ff |= DE_CLOSE;
1329 } else {
1330 ff |= DE_READ;
1331 }
1332 }
1333
1334 // Check writable descriptors. If we're waiting on a connect, detect
1335 // success versus failure by the reaped error code.
1336 if (FD_ISSET(fd, &fdsWrite)) {
1337 FD_CLR(fd, &fdsWrite);
1338 if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
1339 if (!errcode) {
1340 ff |= DE_CONNECT;
1341 } else {
1342 ff |= DE_CLOSE;
1343 }
1344 } else {
1345 ff |= DE_WRITE;
1346 }
1347 }
1348
1349 // Tell the descriptor about the event.
1350 if (ff != 0) {
1351 pdispatcher->OnPreEvent(ff);
1352 pdispatcher->OnEvent(ff, errcode);
1353 }
1354 }
1355 }
1356
1357 // Recalc the time remaining to wait. Doing it here means it doesn't get
1358 // calced twice the first time through the loop
1359
1360 if (cmsWait != kForever) {
1361 ptvWait->tv_sec = 0;
1362 ptvWait->tv_usec = 0;
1363 struct timeval tvT;
1364 gettimeofday(&tvT, NULL);
1365 if ((tvStop.tv_sec > tvT.tv_sec)
1366 || ((tvStop.tv_sec == tvT.tv_sec)
1367 && (tvStop.tv_usec > tvT.tv_usec))) {
1368 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;
1369 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;
1370 if (ptvWait->tv_usec < 0) {
1371 ASSERT(ptvWait->tv_sec > 0);
1372 ptvWait->tv_usec += 1000000;
1373 ptvWait->tv_sec -= 1;
1374 }
1375 }
1376 }
1377 }
1378
1379 return true;
1380 }
1381
GlobalSignalHandler(int signum)1382 static void GlobalSignalHandler(int signum) {
1383 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
1384 }
1385
SetPosixSignalHandler(int signum,void (* handler)(int))1386 bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
1387 void (*handler)(int)) {
1388 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
1389 // otherwise set one.
1390 if (handler == SIG_IGN || handler == SIG_DFL) {
1391 if (!InstallSignal(signum, handler)) {
1392 return false;
1393 }
1394 if (signal_dispatcher_.get()) {
1395 signal_dispatcher_->ClearHandler(signum);
1396 if (!signal_dispatcher_->HasHandlers()) {
1397 signal_dispatcher_.reset();
1398 }
1399 }
1400 } else {
1401 if (!signal_dispatcher_.get()) {
1402 signal_dispatcher_.reset(new PosixSignalDispatcher(this));
1403 }
1404 signal_dispatcher_->SetHandler(signum, handler);
1405 if (!InstallSignal(signum, &GlobalSignalHandler)) {
1406 return false;
1407 }
1408 }
1409 return true;
1410 }
1411
InstallSignal(int signum,void (* handler)(int))1412 bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) {
1413 struct sigaction act;
1414 // It doesn't really matter what we set this mask to.
1415 if (sigemptyset(&act.sa_mask) != 0) {
1416 LOG_ERR(LS_ERROR) << "Couldn't set mask";
1417 return false;
1418 }
1419 act.sa_handler = handler;
1420 // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it
1421 // and it's a nuisance. Though some syscalls still return EINTR and there's no
1422 // real standard for which ones. :(
1423 act.sa_flags = SA_RESTART;
1424 if (sigaction(signum, &act, NULL) != 0) {
1425 LOG_ERR(LS_ERROR) << "Couldn't set sigaction";
1426 return false;
1427 }
1428 return true;
1429 }
1430 #endif // POSIX
1431
1432 #ifdef WIN32
Wait(int cmsWait,bool process_io)1433 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1434 int cmsTotal = cmsWait;
1435 int cmsElapsed = 0;
1436 uint32 msStart = Time();
1437
1438 #if LOGGING
1439 if (last_tick_dispatch_count_ == 0) {
1440 last_tick_tracked_ = msStart;
1441 }
1442 #endif
1443
1444 fWait_ = true;
1445 while (fWait_) {
1446 std::vector<WSAEVENT> events;
1447 std::vector<Dispatcher *> event_owners;
1448
1449 events.push_back(socket_ev_);
1450
1451 {
1452 CritScope cr(&crit_);
1453 size_t i = 0;
1454 iterators_.push_back(&i);
1455 // Don't track dispatchers_.size(), because we want to pick up any new
1456 // dispatchers that were added while processing the loop.
1457 while (i < dispatchers_.size()) {
1458 Dispatcher* disp = dispatchers_[i++];
1459 if (!process_io && (disp != signal_wakeup_))
1460 continue;
1461 SOCKET s = disp->GetSocket();
1462 if (disp->CheckSignalClose()) {
1463 // We just signalled close, don't poll this socket
1464 } else if (s != INVALID_SOCKET) {
1465 WSAEventSelect(s,
1466 events[0],
1467 FlagsToEvents(disp->GetRequestedEvents()));
1468 } else {
1469 events.push_back(disp->GetWSAEvent());
1470 event_owners.push_back(disp);
1471 }
1472 }
1473 ASSERT(iterators_.back() == &i);
1474 iterators_.pop_back();
1475 }
1476
1477 // Which is shorter, the delay wait or the asked wait?
1478
1479 int cmsNext;
1480 if (cmsWait == kForever) {
1481 cmsNext = cmsWait;
1482 } else {
1483 cmsNext = _max(0, cmsTotal - cmsElapsed);
1484 }
1485
1486 // Wait for one of the events to signal
1487 DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()),
1488 &events[0],
1489 false,
1490 cmsNext,
1491 false);
1492
1493 #if 0 // LOGGING
1494 // we track this information purely for logging purposes.
1495 last_tick_dispatch_count_++;
1496 if (last_tick_dispatch_count_ >= 1000) {
1497 int32 elapsed = TimeSince(last_tick_tracked_);
1498 LOG(INFO) << "PhysicalSocketServer took " << elapsed
1499 << "ms for 1000 events";
1500
1501 // If we get more than 1000 events in a second, we are spinning badly
1502 // (normally it should take about 8-20 seconds).
1503 ASSERT(elapsed > 1000);
1504
1505 last_tick_tracked_ = Time();
1506 last_tick_dispatch_count_ = 0;
1507 }
1508 #endif
1509
1510 if (dw == WSA_WAIT_FAILED) {
1511 // Failed?
1512 // TODO: need a better strategy than this!
1513 int error = WSAGetLastError();
1514 ASSERT(false);
1515 return false;
1516 } else if (dw == WSA_WAIT_TIMEOUT) {
1517 // Timeout?
1518 return true;
1519 } else {
1520 // Figure out which one it is and call it
1521 CritScope cr(&crit_);
1522 int index = dw - WSA_WAIT_EVENT_0;
1523 if (index > 0) {
1524 --index; // The first event is the socket event
1525 event_owners[index]->OnPreEvent(0);
1526 event_owners[index]->OnEvent(0, 0);
1527 } else if (process_io) {
1528 size_t i = 0, end = dispatchers_.size();
1529 iterators_.push_back(&i);
1530 iterators_.push_back(&end); // Don't iterate over new dispatchers.
1531 while (i < end) {
1532 Dispatcher* disp = dispatchers_[i++];
1533 SOCKET s = disp->GetSocket();
1534 if (s == INVALID_SOCKET)
1535 continue;
1536
1537 WSANETWORKEVENTS wsaEvents;
1538 int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
1539 if (err == 0) {
1540
1541 #if LOGGING
1542 {
1543 if ((wsaEvents.lNetworkEvents & FD_READ) &&
1544 wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
1545 LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error "
1546 << wsaEvents.iErrorCode[FD_READ_BIT];
1547 }
1548 if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
1549 wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
1550 LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error "
1551 << wsaEvents.iErrorCode[FD_WRITE_BIT];
1552 }
1553 if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
1554 wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
1555 LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error "
1556 << wsaEvents.iErrorCode[FD_CONNECT_BIT];
1557 }
1558 if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
1559 wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
1560 LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error "
1561 << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
1562 }
1563 if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
1564 wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
1565 LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error "
1566 << wsaEvents.iErrorCode[FD_CLOSE_BIT];
1567 }
1568 }
1569 #endif
1570 uint32 ff = 0;
1571 int errcode = 0;
1572 if (wsaEvents.lNetworkEvents & FD_READ)
1573 ff |= DE_READ;
1574 if (wsaEvents.lNetworkEvents & FD_WRITE)
1575 ff |= DE_WRITE;
1576 if (wsaEvents.lNetworkEvents & FD_CONNECT) {
1577 if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
1578 ff |= DE_CONNECT;
1579 } else {
1580 ff |= DE_CLOSE;
1581 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
1582 }
1583 }
1584 if (wsaEvents.lNetworkEvents & FD_ACCEPT)
1585 ff |= DE_ACCEPT;
1586 if (wsaEvents.lNetworkEvents & FD_CLOSE) {
1587 ff |= DE_CLOSE;
1588 errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
1589 }
1590 if (ff != 0) {
1591 disp->OnPreEvent(ff);
1592 disp->OnEvent(ff, errcode);
1593 }
1594 }
1595 }
1596 ASSERT(iterators_.back() == &end);
1597 iterators_.pop_back();
1598 ASSERT(iterators_.back() == &i);
1599 iterators_.pop_back();
1600 }
1601
1602 // Reset the network event until new activity occurs
1603 WSAResetEvent(socket_ev_);
1604 }
1605
1606 // Break?
1607 if (!fWait_)
1608 break;
1609 cmsElapsed = TimeSince(msStart);
1610 if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
1611 break;
1612 }
1613 }
1614
1615 // Done
1616 return true;
1617 }
1618 #endif // WIN32
1619
1620 } // namespace talk_base
1621