• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/udp/udp_socket_libevent.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <netdb.h>
10 #include <sys/socket.h>
11 
12 #include "base/eintr_wrapper.h"
13 #include "base/logging.h"
14 #include "base/message_loop.h"
15 #include "base/metrics/stats_counters.h"
16 #include "net/base/io_buffer.h"
17 #include "net/base/ip_endpoint.h"
18 #include "net/base/net_errors.h"
19 #include "net/base/net_log.h"
20 #include "net/base/net_util.h"
21 #if defined(OS_POSIX)
22 #include <netinet/in.h>
23 #endif
24 #if defined(USE_SYSTEM_LIBEVENT)
25 #include <event.h>
26 #else
27 #include "third_party/libevent/event.h"
28 #endif
29 
30 namespace net {
31 
UDPSocketLibevent(net::NetLog * net_log,const net::NetLog::Source & source)32 UDPSocketLibevent::UDPSocketLibevent(net::NetLog* net_log,
33                                      const net::NetLog::Source& source)
34     : socket_(kInvalidSocket),
35       read_watcher_(this),
36       write_watcher_(this),
37       read_buf_len_(0),
38       recv_from_address_(NULL),
39       write_buf_len_(0),
40       read_callback_(NULL),
41       write_callback_(NULL),
42       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SOCKET)) {
43   scoped_refptr<NetLog::EventParameters> params;
44   if (source.is_valid())
45     params = new NetLogSourceParameter("source_dependency", source);
46   net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE, params);
47 }
48 
~UDPSocketLibevent()49 UDPSocketLibevent::~UDPSocketLibevent() {
50   Close();
51   net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE, NULL);
52 }
53 
Close()54 void UDPSocketLibevent::Close() {
55   DCHECK(CalledOnValidThread());
56 
57   if (!is_connected())
58     return;
59 
60   // Zero out any pending read/write callback state.
61   read_buf_ = NULL;
62   read_buf_len_ = 0;
63   read_callback_ = NULL;
64   recv_from_address_ = NULL;
65   write_buf_ = NULL;
66   write_buf_len_ = 0;
67   write_callback_ = NULL;
68   send_to_address_.reset();
69 
70   bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
71   DCHECK(ok);
72   ok = write_socket_watcher_.StopWatchingFileDescriptor();
73   DCHECK(ok);
74 
75   if (HANDLE_EINTR(close(socket_)) < 0)
76     PLOG(ERROR) << "close";
77 
78   socket_ = kInvalidSocket;
79 }
80 
GetPeerAddress(IPEndPoint * address) const81 int UDPSocketLibevent::GetPeerAddress(IPEndPoint* address) const {
82   DCHECK(CalledOnValidThread());
83   DCHECK(address);
84   if (!is_connected())
85     return ERR_SOCKET_NOT_CONNECTED;
86 
87   if (!remote_address_.get()) {
88     struct sockaddr_storage addr_storage;
89     socklen_t addr_len = sizeof(addr_storage);
90     struct sockaddr* addr = reinterpret_cast<struct sockaddr*>(&addr_storage);
91     if (getpeername(socket_, addr, &addr_len))
92       return MapSystemError(errno);
93     scoped_ptr<IPEndPoint> address(new IPEndPoint());
94     if (!address->FromSockAddr(addr, addr_len))
95       return ERR_FAILED;
96     remote_address_.reset(address.release());
97   }
98 
99   *address = *remote_address_;
100   return OK;
101 }
102 
GetLocalAddress(IPEndPoint * address) const103 int UDPSocketLibevent::GetLocalAddress(IPEndPoint* address) const {
104   DCHECK(CalledOnValidThread());
105   DCHECK(address);
106   if (!is_connected())
107     return ERR_SOCKET_NOT_CONNECTED;
108 
109   if (!local_address_.get()) {
110     struct sockaddr_storage addr_storage;
111     socklen_t addr_len = sizeof(addr_storage);
112     struct sockaddr* addr = reinterpret_cast<struct sockaddr*>(&addr_storage);
113     if (getsockname(socket_, addr, &addr_len))
114       return MapSystemError(errno);
115     scoped_ptr<IPEndPoint> address(new IPEndPoint());
116     if (!address->FromSockAddr(addr, addr_len))
117       return ERR_FAILED;
118     local_address_.reset(address.release());
119   }
120 
121   *address = *local_address_;
122   return OK;
123 }
124 
Read(IOBuffer * buf,int buf_len,CompletionCallback * callback)125 int UDPSocketLibevent::Read(IOBuffer* buf,
126                             int buf_len,
127                             CompletionCallback* callback) {
128   return RecvFrom(buf, buf_len, NULL, callback);
129 }
130 
RecvFrom(IOBuffer * buf,int buf_len,IPEndPoint * address,CompletionCallback * callback)131 int UDPSocketLibevent::RecvFrom(IOBuffer* buf,
132                                 int buf_len,
133                                 IPEndPoint* address,
134                                 CompletionCallback* callback) {
135   DCHECK(CalledOnValidThread());
136   DCHECK_NE(kInvalidSocket, socket_);
137   DCHECK(!read_callback_);
138   DCHECK(!recv_from_address_);
139   DCHECK(callback);  // Synchronous operation not supported
140   DCHECK_GT(buf_len, 0);
141 
142   int nread = InternalRecvFrom(buf, buf_len, address);
143   if (nread != ERR_IO_PENDING)
144     return nread;
145 
146   if (!MessageLoopForIO::current()->WatchFileDescriptor(
147           socket_, true, MessageLoopForIO::WATCH_READ,
148           &read_socket_watcher_, &read_watcher_)) {
149     PLOG(ERROR) << "WatchFileDescriptor failed on read";
150     return MapSystemError(errno);
151   }
152 
153   read_buf_ = buf;
154   read_buf_len_ = buf_len;
155   recv_from_address_ = address;
156   read_callback_ = callback;
157   return ERR_IO_PENDING;
158 }
159 
Write(IOBuffer * buf,int buf_len,CompletionCallback * callback)160 int UDPSocketLibevent::Write(IOBuffer* buf,
161                              int buf_len,
162                              CompletionCallback* callback) {
163   return SendToOrWrite(buf, buf_len, NULL, callback);
164 }
165 
SendTo(IOBuffer * buf,int buf_len,const IPEndPoint & address,CompletionCallback * callback)166 int UDPSocketLibevent::SendTo(IOBuffer* buf,
167                               int buf_len,
168                               const IPEndPoint& address,
169                               CompletionCallback* callback) {
170   return SendToOrWrite(buf, buf_len, &address, callback);
171 }
172 
SendToOrWrite(IOBuffer * buf,int buf_len,const IPEndPoint * address,CompletionCallback * callback)173 int UDPSocketLibevent::SendToOrWrite(IOBuffer* buf,
174                                      int buf_len,
175                                      const IPEndPoint* address,
176                                      CompletionCallback* callback) {
177   DCHECK(CalledOnValidThread());
178   DCHECK_NE(kInvalidSocket, socket_);
179   DCHECK(!write_callback_);
180   DCHECK(callback);  // Synchronous operation not supported
181   DCHECK_GT(buf_len, 0);
182 
183   int nwrite = InternalSendTo(buf, buf_len, address);
184   if (nwrite >= 0) {
185     base::StatsCounter write_bytes("udp.write_bytes");
186     write_bytes.Add(nwrite);
187     return nwrite;
188   }
189   if (errno != EAGAIN && errno != EWOULDBLOCK)
190     return MapSystemError(errno);
191 
192   if (!MessageLoopForIO::current()->WatchFileDescriptor(
193           socket_, true, MessageLoopForIO::WATCH_WRITE,
194           &write_socket_watcher_, &write_watcher_)) {
195     DVLOG(1) << "WatchFileDescriptor failed on write, errno " << errno;
196     return MapSystemError(errno);
197   }
198 
199   write_buf_ = buf;
200   write_buf_len_ = buf_len;
201   DCHECK(!send_to_address_.get());
202   if (address) {
203     send_to_address_.reset(new IPEndPoint(*address));
204   }
205   write_callback_ = callback;
206   return ERR_IO_PENDING;
207 }
208 
Connect(const IPEndPoint & address)209 int UDPSocketLibevent::Connect(const IPEndPoint& address) {
210   DCHECK(!is_connected());
211   DCHECK(!remote_address_.get());
212   int rv = CreateSocket(address);
213   if (rv < 0)
214     return rv;
215 
216   struct sockaddr_storage addr_storage;
217   size_t addr_len = sizeof(addr_storage);
218   struct sockaddr* addr = reinterpret_cast<struct sockaddr*>(&addr_storage);
219   if (!address.ToSockAddr(addr, &addr_len))
220     return ERR_FAILED;
221 
222   rv = HANDLE_EINTR(connect(socket_, addr, addr_len));
223   if (rv < 0)
224     return MapSystemError(errno);
225 
226   remote_address_.reset(new IPEndPoint(address));
227   return rv;
228 }
229 
Bind(const IPEndPoint & address)230 int UDPSocketLibevent::Bind(const IPEndPoint& address) {
231   DCHECK(!is_connected());
232   DCHECK(!local_address_.get());
233   int rv = CreateSocket(address);
234   if (rv < 0)
235     return rv;
236 
237   struct sockaddr_storage addr_storage;
238   size_t addr_len = sizeof(addr_storage);
239   struct sockaddr* addr = reinterpret_cast<struct sockaddr*>(&addr_storage);
240   if (!address.ToSockAddr(addr, &addr_len))
241     return ERR_FAILED;
242 
243   rv = bind(socket_, addr, addr_len);
244   if (rv < 0)
245     return MapSystemError(errno);
246 
247   local_address_.reset();
248   return rv;
249 }
250 
DoReadCallback(int rv)251 void UDPSocketLibevent::DoReadCallback(int rv) {
252   DCHECK_NE(rv, ERR_IO_PENDING);
253   DCHECK(read_callback_);
254 
255   // since Run may result in Read being called, clear read_callback_ up front.
256   CompletionCallback* c = read_callback_;
257   read_callback_ = NULL;
258   c->Run(rv);
259 }
260 
DoWriteCallback(int rv)261 void UDPSocketLibevent::DoWriteCallback(int rv) {
262   DCHECK_NE(rv, ERR_IO_PENDING);
263   DCHECK(write_callback_);
264 
265   // since Run may result in Write being called, clear write_callback_ up front.
266   CompletionCallback* c = write_callback_;
267   write_callback_ = NULL;
268   c->Run(rv);
269 }
270 
DidCompleteRead()271 void UDPSocketLibevent::DidCompleteRead() {
272   int result = InternalRecvFrom(read_buf_, read_buf_len_, recv_from_address_);
273   if (result != ERR_IO_PENDING) {
274     read_buf_ = NULL;
275     read_buf_len_ = 0;
276     recv_from_address_ = NULL;
277     bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
278     DCHECK(ok);
279     DoReadCallback(result);
280   }
281 }
282 
CreateSocket(const IPEndPoint & address)283 int UDPSocketLibevent::CreateSocket(const IPEndPoint& address) {
284   socket_ = socket(address.GetFamily(), SOCK_DGRAM, 0);
285   if (socket_ == kInvalidSocket)
286     return MapSystemError(errno);
287   if (SetNonBlocking(socket_)) {
288     const int err = MapSystemError(errno);
289     Close();
290     return err;
291   }
292   return OK;
293 }
294 
DidCompleteWrite()295 void UDPSocketLibevent::DidCompleteWrite() {
296   int result = InternalSendTo(write_buf_, write_buf_len_,
297                               send_to_address_.get());
298   if (result >= 0) {
299     base::StatsCounter write_bytes("udp.write_bytes");
300     write_bytes.Add(result);
301   } else {
302     result = MapSystemError(errno);
303   }
304 
305   if (result != ERR_IO_PENDING) {
306     write_buf_ = NULL;
307     write_buf_len_ = 0;
308     send_to_address_.reset();
309     write_socket_watcher_.StopWatchingFileDescriptor();
310     DoWriteCallback(result);
311   }
312 }
313 
InternalRecvFrom(IOBuffer * buf,int buf_len,IPEndPoint * address)314 int UDPSocketLibevent::InternalRecvFrom(IOBuffer* buf, int buf_len,
315                                         IPEndPoint* address) {
316   int bytes_transferred;
317   int flags = 0;
318 
319   struct sockaddr_storage addr_storage;
320   socklen_t addr_len = sizeof(addr_storage);
321   struct sockaddr* addr = reinterpret_cast<struct sockaddr*>(&addr_storage);
322 
323   bytes_transferred =
324       HANDLE_EINTR(recvfrom(socket_,
325                             buf->data(),
326                             buf_len,
327                             flags,
328                             addr,
329                             &addr_len));
330   int result;
331   if (bytes_transferred >= 0) {
332     result = bytes_transferred;
333     base::StatsCounter read_bytes("udp.read_bytes");
334     read_bytes.Add(bytes_transferred);
335     if (address) {
336       if (!address->FromSockAddr(addr, addr_len))
337         result = ERR_FAILED;
338     }
339   } else {
340     result = MapSystemError(errno);
341   }
342   return result;
343 }
344 
InternalSendTo(IOBuffer * buf,int buf_len,const IPEndPoint * address)345 int UDPSocketLibevent::InternalSendTo(IOBuffer* buf, int buf_len,
346                                       const IPEndPoint* address) {
347   struct sockaddr_storage addr_storage;
348   size_t addr_len = sizeof(addr_storage);
349   struct sockaddr* addr = reinterpret_cast<struct sockaddr*>(&addr_storage);
350 
351   if (!address) {
352     addr = NULL;
353     addr_len = 0;
354   } else {
355     if (!address->ToSockAddr(addr, &addr_len))
356       return ERR_FAILED;
357   }
358 
359   return HANDLE_EINTR(sendto(socket_,
360                              buf->data(),
361                              buf_len,
362                              0,
363                              addr,
364                              addr_len));
365 }
366 
367 }  // namespace net
368