• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2004--2005, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "talk/p2p/base/tcpport.h"
29 
30 #include "talk/base/common.h"
31 #include "talk/base/logging.h"
32 #include "talk/p2p/base/common.h"
33 
34 namespace cricket {
35 
TCPPort(talk_base::Thread * thread,talk_base::PacketSocketFactory * factory,talk_base::Network * network,uint32 ip,int min_port,int max_port,bool allow_listen)36 TCPPort::TCPPort(talk_base::Thread* thread,
37                  talk_base::PacketSocketFactory* factory,
38                  talk_base::Network* network, uint32 ip,
39                  int min_port, int max_port, bool allow_listen)
40     : Port(thread, LOCAL_PORT_TYPE, factory, network, ip, min_port, max_port),
41       incoming_only_(false),
42       allow_listen_(allow_listen),
43       socket_(NULL),
44       error_(0) {
45 }
46 
Init()47 bool TCPPort::Init() {
48   // Treat failure to create or bind a TCP socket as fatal.  This
49   // should never happen.
50   socket_ = factory_->CreateServerTcpSocket(
51       talk_base::SocketAddress(ip_, 0), min_port_, max_port_, allow_listen_,
52       false /* ssl */);
53   if (!socket_) {
54     LOG_J(LS_ERROR, this) << "TCP socket creation failed.";
55     return false;
56   }
57   socket_->SignalNewConnection.connect(this, &TCPPort::OnNewConnection);
58   return true;
59 }
60 
~TCPPort()61 TCPPort::~TCPPort() {
62   delete socket_;
63 }
64 
CreateConnection(const Candidate & address,CandidateOrigin origin)65 Connection* TCPPort::CreateConnection(const Candidate& address,
66                                       CandidateOrigin origin) {
67   // We only support TCP protocols
68   if ((address.protocol() != "tcp") && (address.protocol() != "ssltcp"))
69     return NULL;
70 
71   // We can't accept TCP connections incoming on other ports
72   if (origin == ORIGIN_OTHER_PORT)
73     return NULL;
74 
75   // Check if we are allowed to make outgoing TCP connections
76   if (incoming_only_ && (origin == ORIGIN_MESSAGE))
77     return NULL;
78 
79   // We don't know how to act as an ssl server yet
80   if ((address.protocol() == "ssltcp") && (origin == ORIGIN_THIS_PORT))
81     return NULL;
82 
83   TCPConnection* conn = NULL;
84   if (talk_base::AsyncPacketSocket* socket =
85       GetIncoming(address.address(), true)) {
86     socket->SignalReadPacket.disconnect(this);
87     conn = new TCPConnection(this, address, socket);
88   } else {
89     conn = new TCPConnection(this, address);
90   }
91   AddConnection(conn);
92   return conn;
93 }
94 
PrepareAddress()95 void TCPPort::PrepareAddress() {
96   if (!allow_listen_) {
97     LOG_J(LS_INFO, this) << "Not listening due to firewall restrictions.";
98   }
99   // Note: We still add the address, since otherwise the remote side won't
100   // recognize our incoming TCP connections.
101   bool allocated;
102   talk_base::SocketAddress address = socket_->GetLocalAddress(&allocated);
103   if (allocated) {
104     AddAddress(address, "tcp", true);
105   } else {
106     socket_->SignalAddressReady.connect(this, &TCPPort::OnAddresReady);
107   }
108 }
109 
SendTo(const void * data,size_t size,const talk_base::SocketAddress & addr,bool payload)110 int TCPPort::SendTo(const void* data, size_t size,
111                     const talk_base::SocketAddress& addr, bool payload) {
112   talk_base::AsyncPacketSocket * socket = NULL;
113   if (TCPConnection * conn = static_cast<TCPConnection*>(GetConnection(addr))) {
114     socket = conn->socket();
115   } else {
116     socket = GetIncoming(addr);
117   }
118   if (!socket) {
119     LOG_J(LS_ERROR, this) << "Attempted to send to an unknown destination, "
120                           << addr.ToString();
121     return -1;  // TODO: Set error_
122   }
123 
124   int sent = socket->Send(data, size);
125   if (sent < 0) {
126     error_ = socket->GetError();
127     LOG_J(LS_ERROR, this) << "TCP send of " << size
128                           << " bytes failed with error " << error_;
129   }
130   return sent;
131 }
132 
SetOption(talk_base::Socket::Option opt,int value)133 int TCPPort::SetOption(talk_base::Socket::Option opt, int value) {
134   return socket_->SetOption(opt, value);
135 }
136 
GetError()137 int TCPPort::GetError() {
138   return error_;
139 }
140 
OnNewConnection(talk_base::AsyncPacketSocket * socket,talk_base::AsyncPacketSocket * new_socket)141 void TCPPort::OnNewConnection(talk_base::AsyncPacketSocket* socket,
142                               talk_base::AsyncPacketSocket* new_socket) {
143   ASSERT(socket == socket_);
144 
145   Incoming incoming;
146   incoming.addr = new_socket->GetRemoteAddress();
147   incoming.socket = new_socket;
148   incoming.socket->SignalReadPacket.connect(this, &TCPPort::OnReadPacket);
149 
150   LOG_J(LS_VERBOSE, this) << "Accepted connection from "
151                           << incoming.addr.ToString();
152   incoming_.push_back(incoming);
153 }
154 
GetIncoming(const talk_base::SocketAddress & addr,bool remove)155 talk_base::AsyncPacketSocket* TCPPort::GetIncoming(
156     const talk_base::SocketAddress& addr, bool remove) {
157   talk_base::AsyncPacketSocket* socket = NULL;
158   for (std::list<Incoming>::iterator it = incoming_.begin();
159        it != incoming_.end(); ++it) {
160     if (it->addr == addr) {
161       socket = it->socket;
162       if (remove)
163         incoming_.erase(it);
164       break;
165     }
166   }
167   return socket;
168 }
169 
OnReadPacket(talk_base::AsyncPacketSocket * socket,const char * data,size_t size,const talk_base::SocketAddress & remote_addr)170 void TCPPort::OnReadPacket(talk_base::AsyncPacketSocket* socket,
171                            const char* data, size_t size,
172                            const talk_base::SocketAddress& remote_addr) {
173   Port::OnReadPacket(data, size, remote_addr);
174 }
175 
OnAddresReady(talk_base::AsyncPacketSocket * socket,const talk_base::SocketAddress & address)176 void TCPPort::OnAddresReady(talk_base::AsyncPacketSocket* socket,
177                             const talk_base::SocketAddress& address) {
178   AddAddress(address, "tcp", true);
179 }
180 
TCPConnection(TCPPort * port,const Candidate & candidate,talk_base::AsyncPacketSocket * socket)181 TCPConnection::TCPConnection(TCPPort* port, const Candidate& candidate,
182                              talk_base::AsyncPacketSocket* socket)
183     : Connection(port, 0, candidate), socket_(socket), error_(0) {
184   bool outgoing = (socket_ == NULL);
185   if (outgoing) {
186     // TODO: Handle failures here (unlikely since TCP).
187 
188     socket_ = port->socket_factory()->CreateClientTcpSocket(
189         talk_base::SocketAddress(port_->network()->ip(), 0),
190         candidate.address(), port->proxy(), port->user_agent(),
191         candidate.protocol() == "ssltcp");
192     if (socket_) {
193       LOG_J(LS_VERBOSE, this) << "Connecting from "
194                               << socket_->GetLocalAddress(NULL).ToString()
195                               << " to " << candidate.address().ToString();
196       set_connected(false);
197       socket_->SignalConnect.connect(this, &TCPConnection::OnConnect);
198     } else {
199       LOG_J(LS_WARNING, this) << "Failed to create connection to "
200                               << candidate.address().ToString();
201     }
202   } else {
203     // Incoming connections should match the network address.
204     ASSERT(socket_->GetLocalAddress(NULL).ip() == port->ip_);
205   }
206 
207   if (socket_) {
208     socket_->SignalReadPacket.connect(this, &TCPConnection::OnReadPacket);
209     socket_->SignalClose.connect(this, &TCPConnection::OnClose);
210   }
211 }
212 
~TCPConnection()213 TCPConnection::~TCPConnection() {
214   delete socket_;
215 }
216 
Send(const void * data,size_t size)217 int TCPConnection::Send(const void* data, size_t size) {
218   if (!socket_) {
219     error_ = ENOTCONN;
220     return SOCKET_ERROR;
221   }
222 
223   if (write_state() != STATE_WRITABLE) {
224     // TODO: Should STATE_WRITE_TIMEOUT return a non-blocking error?
225     error_ = EWOULDBLOCK;
226     return SOCKET_ERROR;
227   }
228   int sent = socket_->Send(data, size);
229   if (sent < 0) {
230     error_ = socket_->GetError();
231   } else {
232     send_rate_tracker_.Update(sent);
233   }
234   return sent;
235 }
236 
GetError()237 int TCPConnection::GetError() {
238   return error_;
239 }
240 
OnConnect(talk_base::AsyncPacketSocket * socket)241 void TCPConnection::OnConnect(talk_base::AsyncPacketSocket* socket) {
242   ASSERT(socket == socket_);
243   LOG_J(LS_VERBOSE, this) << "Connection established to "
244                           << socket->GetRemoteAddress().ToString();
245   set_connected(true);
246 }
247 
OnClose(talk_base::AsyncPacketSocket * socket,int error)248 void TCPConnection::OnClose(talk_base::AsyncPacketSocket* socket, int error) {
249   ASSERT(socket == socket_);
250   LOG_J(LS_VERBOSE, this) << "Connection closed with error " << error;
251   set_connected(false);
252   set_write_state(STATE_WRITE_TIMEOUT);
253 }
254 
OnReadPacket(talk_base::AsyncPacketSocket * socket,const char * data,size_t size,const talk_base::SocketAddress & remote_addr)255 void TCPConnection::OnReadPacket(talk_base::AsyncPacketSocket* socket,
256                                  const char* data, size_t size,
257                                  const talk_base::SocketAddress& remote_addr) {
258   ASSERT(socket == socket_);
259   Connection::OnReadPacket(data, size);
260 }
261 
262 }  // namespace cricket
263