• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "media/cast/net/udp_transport.h"
6 
7 #include <algorithm>
8 #include <string>
9 
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/memory/scoped_ptr.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/rand_util.h"
16 #include "net/base/io_buffer.h"
17 #include "net/base/net_errors.h"
18 #include "net/base/rand_callback.h"
19 
20 namespace media {
21 namespace cast {
22 
23 namespace {
24 const int kMaxPacketSize = 1500;
25 
IsEmpty(const net::IPEndPoint & addr)26 bool IsEmpty(const net::IPEndPoint& addr) {
27   net::IPAddressNumber empty_addr(addr.address().size());
28   return std::equal(
29              empty_addr.begin(), empty_addr.end(), addr.address().begin()) &&
30          !addr.port();
31 }
32 
IsEqual(const net::IPEndPoint & addr1,const net::IPEndPoint & addr2)33 bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) {
34   return addr1.port() == addr2.port() && std::equal(addr1.address().begin(),
35                                                     addr1.address().end(),
36                                                     addr2.address().begin());
37 }
38 }  // namespace
39 
UdpTransport(net::NetLog * net_log,const scoped_refptr<base::SingleThreadTaskRunner> & io_thread_proxy,const net::IPEndPoint & local_end_point,const net::IPEndPoint & remote_end_point,const CastTransportStatusCallback & status_callback)40 UdpTransport::UdpTransport(
41     net::NetLog* net_log,
42     const scoped_refptr<base::SingleThreadTaskRunner>& io_thread_proxy,
43     const net::IPEndPoint& local_end_point,
44     const net::IPEndPoint& remote_end_point,
45     const CastTransportStatusCallback& status_callback)
46     : io_thread_proxy_(io_thread_proxy),
47       local_addr_(local_end_point),
48       remote_addr_(remote_end_point),
49       udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
50                                      net::RandIntCallback(),
51                                      net_log,
52                                      net::NetLog::Source())),
53       send_pending_(false),
54       receive_pending_(false),
55       client_connected_(false),
56       next_dscp_value_(net::DSCP_NO_CHANGE),
57       status_callback_(status_callback),
58       bytes_sent_(0),
59       weak_factory_(this) {
60   DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point));
61 }
62 
~UdpTransport()63 UdpTransport::~UdpTransport() {}
64 
StartReceiving(const PacketReceiverCallback & packet_receiver)65 void UdpTransport::StartReceiving(
66     const PacketReceiverCallback& packet_receiver) {
67   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
68 
69   packet_receiver_ = packet_receiver;
70   udp_socket_->AllowAddressReuse();
71   udp_socket_->SetMulticastLoopbackMode(true);
72   if (!IsEmpty(local_addr_)) {
73     if (udp_socket_->Bind(local_addr_) < 0) {
74       status_callback_.Run(TRANSPORT_SOCKET_ERROR);
75       LOG(ERROR) << "Failed to bind local address.";
76       return;
77     }
78   } else if (!IsEmpty(remote_addr_)) {
79     if (udp_socket_->Connect(remote_addr_) < 0) {
80       status_callback_.Run(TRANSPORT_SOCKET_ERROR);
81       LOG(ERROR) << "Failed to connect to remote address.";
82       return;
83     }
84     client_connected_ = true;
85   } else {
86     NOTREACHED() << "Either local or remote address has to be defined.";
87   }
88 
89   ScheduleReceiveNextPacket();
90 }
91 
SetDscp(net::DiffServCodePoint dscp)92 void UdpTransport::SetDscp(net::DiffServCodePoint dscp) {
93   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
94   next_dscp_value_ = dscp;
95 }
96 
ScheduleReceiveNextPacket()97 void UdpTransport::ScheduleReceiveNextPacket() {
98   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
99   if (!packet_receiver_.is_null() && !receive_pending_) {
100     receive_pending_ = true;
101     io_thread_proxy_->PostTask(FROM_HERE,
102                                base::Bind(&UdpTransport::ReceiveNextPacket,
103                                           weak_factory_.GetWeakPtr(),
104                                           net::ERR_IO_PENDING));
105   }
106 }
107 
ReceiveNextPacket(int length_or_status)108 void UdpTransport::ReceiveNextPacket(int length_or_status) {
109   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
110 
111   // Loop while UdpSocket is delivering data synchronously.  When it responds
112   // with a "pending" status, break and expect this method to be called back in
113   // the future when a packet is ready.
114   while (true) {
115     if (length_or_status == net::ERR_IO_PENDING) {
116       next_packet_.reset(new Packet(kMaxPacketSize));
117       recv_buf_ = new net::WrappedIOBuffer(
118           reinterpret_cast<char*>(&next_packet_->front()));
119       length_or_status =
120           udp_socket_->RecvFrom(recv_buf_.get(),
121                                 kMaxPacketSize,
122                                 &recv_addr_,
123                                 base::Bind(&UdpTransport::ReceiveNextPacket,
124                                            weak_factory_.GetWeakPtr()));
125       if (length_or_status == net::ERR_IO_PENDING) {
126         receive_pending_ = true;
127         return;
128       }
129     }
130 
131     // Note: At this point, either a packet is ready or an error has occurred.
132     if (length_or_status < 0) {
133       VLOG(1) << "Failed to receive packet: Status code is "
134               << length_or_status;
135       receive_pending_ = false;
136       return;
137     }
138 
139     // Confirm the packet has come from the expected remote address; otherwise,
140     // ignore it.  If this is the first packet being received and no remote
141     // address has been set, set the remote address and expect all future
142     // packets to come from the same one.
143     // TODO(hubbe): We should only do this if the caller used a valid ssrc.
144     if (IsEmpty(remote_addr_)) {
145       remote_addr_ = recv_addr_;
146       VLOG(1) << "Setting remote address from first received packet: "
147               << remote_addr_.ToString();
148     } else if (!IsEqual(remote_addr_, recv_addr_)) {
149       VLOG(1) << "Ignoring packet received from an unrecognized address: "
150               << recv_addr_.ToString() << ".";
151       length_or_status = net::ERR_IO_PENDING;
152       continue;
153     }
154 
155     next_packet_->resize(length_or_status);
156     packet_receiver_.Run(next_packet_.Pass());
157     length_or_status = net::ERR_IO_PENDING;
158   }
159 }
160 
SendPacket(PacketRef packet,const base::Closure & cb)161 bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) {
162   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
163 
164   // Increase byte count no matter the packet was sent or dropped.
165   bytes_sent_ += packet->data.size();
166 
167   DCHECK(!send_pending_);
168   if (send_pending_) {
169     VLOG(1) << "Cannot send because of pending IO.";
170     return true;
171   }
172 
173   if (next_dscp_value_ != net::DSCP_NO_CHANGE) {
174     int result = udp_socket_->SetDiffServCodePoint(next_dscp_value_);
175     if (result != net::OK) {
176       VLOG(1) << "Unable to set DSCP: " << next_dscp_value_
177               << " to socket; Error: " << result;
178     }
179 
180     if (result != net::ERR_SOCKET_NOT_CONNECTED) {
181       // Don't change DSCP in next send.
182       next_dscp_value_ = net::DSCP_NO_CHANGE;
183     }
184   }
185 
186   scoped_refptr<net::IOBuffer> buf =
187       new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->data.front()));
188 
189   int result;
190   base::Callback<void(int)> callback = base::Bind(&UdpTransport::OnSent,
191                                                   weak_factory_.GetWeakPtr(),
192                                                   buf,
193                                                   packet,
194                                                   cb);
195   if (client_connected_) {
196     // If we called Connect() before we must call Write() instead of
197     // SendTo(). Otherwise on some platforms we might get
198     // ERR_SOCKET_IS_CONNECTED.
199     result = udp_socket_->Write(
200         buf.get(), static_cast<int>(packet->data.size()), callback);
201   } else if (!IsEmpty(remote_addr_)) {
202     result = udp_socket_->SendTo(buf.get(),
203                                  static_cast<int>(packet->data.size()),
204                                  remote_addr_,
205                                  callback);
206   } else {
207     VLOG(1) << "Failed to send packet; socket is neither bound nor "
208             << "connected.";
209     return true;
210   }
211 
212   if (result == net::ERR_IO_PENDING) {
213     send_pending_ = true;
214     return false;
215   }
216   OnSent(buf, packet, base::Closure(), result);
217   return true;
218 }
219 
GetBytesSent()220 int64 UdpTransport::GetBytesSent() {
221   return bytes_sent_;
222 }
223 
OnSent(const scoped_refptr<net::IOBuffer> & buf,PacketRef packet,const base::Closure & cb,int result)224 void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf,
225                           PacketRef packet,
226                           const base::Closure& cb,
227                           int result) {
228   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
229 
230   send_pending_ = false;
231   if (result < 0) {
232     VLOG(1) << "Failed to send packet: " << result << ".";
233   }
234   ScheduleReceiveNextPacket();
235 
236   if (!cb.is_null()) {
237     cb.Run();
238   }
239 }
240 
241 }  // namespace cast
242 }  // namespace media
243