1 /*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #include "talk/p2p/base/tcpport.h"
29
30 #include "talk/base/common.h"
31 #include "talk/base/logging.h"
32 #include "talk/p2p/base/common.h"
33
34 namespace cricket {
35
TCPPort(talk_base::Thread * thread,talk_base::PacketSocketFactory * factory,talk_base::Network * network,uint32 ip,int min_port,int max_port,bool allow_listen)36 TCPPort::TCPPort(talk_base::Thread* thread,
37 talk_base::PacketSocketFactory* factory,
38 talk_base::Network* network, uint32 ip,
39 int min_port, int max_port, bool allow_listen)
40 : Port(thread, LOCAL_PORT_TYPE, factory, network, ip, min_port, max_port),
41 incoming_only_(false),
42 allow_listen_(allow_listen),
43 socket_(NULL),
44 error_(0) {
45 }
46
Init()47 bool TCPPort::Init() {
48 // Treat failure to create or bind a TCP socket as fatal. This
49 // should never happen.
50 socket_ = factory_->CreateServerTcpSocket(
51 talk_base::SocketAddress(ip_, 0), min_port_, max_port_, allow_listen_,
52 false /* ssl */);
53 if (!socket_) {
54 LOG_J(LS_ERROR, this) << "TCP socket creation failed.";
55 return false;
56 }
57 socket_->SignalNewConnection.connect(this, &TCPPort::OnNewConnection);
58 return true;
59 }
60
~TCPPort()61 TCPPort::~TCPPort() {
62 delete socket_;
63 }
64
CreateConnection(const Candidate & address,CandidateOrigin origin)65 Connection* TCPPort::CreateConnection(const Candidate& address,
66 CandidateOrigin origin) {
67 // We only support TCP protocols
68 if ((address.protocol() != "tcp") && (address.protocol() != "ssltcp"))
69 return NULL;
70
71 // We can't accept TCP connections incoming on other ports
72 if (origin == ORIGIN_OTHER_PORT)
73 return NULL;
74
75 // Check if we are allowed to make outgoing TCP connections
76 if (incoming_only_ && (origin == ORIGIN_MESSAGE))
77 return NULL;
78
79 // We don't know how to act as an ssl server yet
80 if ((address.protocol() == "ssltcp") && (origin == ORIGIN_THIS_PORT))
81 return NULL;
82
83 TCPConnection* conn = NULL;
84 if (talk_base::AsyncPacketSocket* socket =
85 GetIncoming(address.address(), true)) {
86 socket->SignalReadPacket.disconnect(this);
87 conn = new TCPConnection(this, address, socket);
88 } else {
89 conn = new TCPConnection(this, address);
90 }
91 AddConnection(conn);
92 return conn;
93 }
94
PrepareAddress()95 void TCPPort::PrepareAddress() {
96 if (!allow_listen_) {
97 LOG_J(LS_INFO, this) << "Not listening due to firewall restrictions.";
98 }
99 // Note: We still add the address, since otherwise the remote side won't
100 // recognize our incoming TCP connections.
101 bool allocated;
102 talk_base::SocketAddress address = socket_->GetLocalAddress(&allocated);
103 if (allocated) {
104 AddAddress(address, "tcp", true);
105 } else {
106 socket_->SignalAddressReady.connect(this, &TCPPort::OnAddresReady);
107 }
108 }
109
SendTo(const void * data,size_t size,const talk_base::SocketAddress & addr,bool payload)110 int TCPPort::SendTo(const void* data, size_t size,
111 const talk_base::SocketAddress& addr, bool payload) {
112 talk_base::AsyncPacketSocket * socket = NULL;
113 if (TCPConnection * conn = static_cast<TCPConnection*>(GetConnection(addr))) {
114 socket = conn->socket();
115 } else {
116 socket = GetIncoming(addr);
117 }
118 if (!socket) {
119 LOG_J(LS_ERROR, this) << "Attempted to send to an unknown destination, "
120 << addr.ToString();
121 return -1; // TODO: Set error_
122 }
123
124 int sent = socket->Send(data, size);
125 if (sent < 0) {
126 error_ = socket->GetError();
127 LOG_J(LS_ERROR, this) << "TCP send of " << size
128 << " bytes failed with error " << error_;
129 }
130 return sent;
131 }
132
SetOption(talk_base::Socket::Option opt,int value)133 int TCPPort::SetOption(talk_base::Socket::Option opt, int value) {
134 return socket_->SetOption(opt, value);
135 }
136
GetError()137 int TCPPort::GetError() {
138 return error_;
139 }
140
OnNewConnection(talk_base::AsyncPacketSocket * socket,talk_base::AsyncPacketSocket * new_socket)141 void TCPPort::OnNewConnection(talk_base::AsyncPacketSocket* socket,
142 talk_base::AsyncPacketSocket* new_socket) {
143 ASSERT(socket == socket_);
144
145 Incoming incoming;
146 incoming.addr = new_socket->GetRemoteAddress();
147 incoming.socket = new_socket;
148 incoming.socket->SignalReadPacket.connect(this, &TCPPort::OnReadPacket);
149
150 LOG_J(LS_VERBOSE, this) << "Accepted connection from "
151 << incoming.addr.ToString();
152 incoming_.push_back(incoming);
153 }
154
GetIncoming(const talk_base::SocketAddress & addr,bool remove)155 talk_base::AsyncPacketSocket* TCPPort::GetIncoming(
156 const talk_base::SocketAddress& addr, bool remove) {
157 talk_base::AsyncPacketSocket* socket = NULL;
158 for (std::list<Incoming>::iterator it = incoming_.begin();
159 it != incoming_.end(); ++it) {
160 if (it->addr == addr) {
161 socket = it->socket;
162 if (remove)
163 incoming_.erase(it);
164 break;
165 }
166 }
167 return socket;
168 }
169
OnReadPacket(talk_base::AsyncPacketSocket * socket,const char * data,size_t size,const talk_base::SocketAddress & remote_addr)170 void TCPPort::OnReadPacket(talk_base::AsyncPacketSocket* socket,
171 const char* data, size_t size,
172 const talk_base::SocketAddress& remote_addr) {
173 Port::OnReadPacket(data, size, remote_addr);
174 }
175
OnAddresReady(talk_base::AsyncPacketSocket * socket,const talk_base::SocketAddress & address)176 void TCPPort::OnAddresReady(talk_base::AsyncPacketSocket* socket,
177 const talk_base::SocketAddress& address) {
178 AddAddress(address, "tcp", true);
179 }
180
TCPConnection(TCPPort * port,const Candidate & candidate,talk_base::AsyncPacketSocket * socket)181 TCPConnection::TCPConnection(TCPPort* port, const Candidate& candidate,
182 talk_base::AsyncPacketSocket* socket)
183 : Connection(port, 0, candidate), socket_(socket), error_(0) {
184 bool outgoing = (socket_ == NULL);
185 if (outgoing) {
186 // TODO: Handle failures here (unlikely since TCP).
187
188 socket_ = port->socket_factory()->CreateClientTcpSocket(
189 talk_base::SocketAddress(port_->network()->ip(), 0),
190 candidate.address(), port->proxy(), port->user_agent(),
191 candidate.protocol() == "ssltcp");
192 if (socket_) {
193 LOG_J(LS_VERBOSE, this) << "Connecting from "
194 << socket_->GetLocalAddress(NULL).ToString()
195 << " to " << candidate.address().ToString();
196 set_connected(false);
197 socket_->SignalConnect.connect(this, &TCPConnection::OnConnect);
198 } else {
199 LOG_J(LS_WARNING, this) << "Failed to create connection to "
200 << candidate.address().ToString();
201 }
202 } else {
203 // Incoming connections should match the network address.
204 ASSERT(socket_->GetLocalAddress(NULL).ip() == port->ip_);
205 }
206
207 if (socket_) {
208 socket_->SignalReadPacket.connect(this, &TCPConnection::OnReadPacket);
209 socket_->SignalClose.connect(this, &TCPConnection::OnClose);
210 }
211 }
212
~TCPConnection()213 TCPConnection::~TCPConnection() {
214 delete socket_;
215 }
216
Send(const void * data,size_t size)217 int TCPConnection::Send(const void* data, size_t size) {
218 if (!socket_) {
219 error_ = ENOTCONN;
220 return SOCKET_ERROR;
221 }
222
223 if (write_state() != STATE_WRITABLE) {
224 // TODO: Should STATE_WRITE_TIMEOUT return a non-blocking error?
225 error_ = EWOULDBLOCK;
226 return SOCKET_ERROR;
227 }
228 int sent = socket_->Send(data, size);
229 if (sent < 0) {
230 error_ = socket_->GetError();
231 } else {
232 send_rate_tracker_.Update(sent);
233 }
234 return sent;
235 }
236
GetError()237 int TCPConnection::GetError() {
238 return error_;
239 }
240
OnConnect(talk_base::AsyncPacketSocket * socket)241 void TCPConnection::OnConnect(talk_base::AsyncPacketSocket* socket) {
242 ASSERT(socket == socket_);
243 LOG_J(LS_VERBOSE, this) << "Connection established to "
244 << socket->GetRemoteAddress().ToString();
245 set_connected(true);
246 }
247
OnClose(talk_base::AsyncPacketSocket * socket,int error)248 void TCPConnection::OnClose(talk_base::AsyncPacketSocket* socket, int error) {
249 ASSERT(socket == socket_);
250 LOG_J(LS_VERBOSE, this) << "Connection closed with error " << error;
251 set_connected(false);
252 set_write_state(STATE_WRITE_TIMEOUT);
253 }
254
OnReadPacket(talk_base::AsyncPacketSocket * socket,const char * data,size_t size,const talk_base::SocketAddress & remote_addr)255 void TCPConnection::OnReadPacket(talk_base::AsyncPacketSocket* socket,
256 const char* data, size_t size,
257 const talk_base::SocketAddress& remote_addr) {
258 ASSERT(socket == socket_);
259 Connection::OnReadPacket(data, size);
260 }
261
262 } // namespace cricket
263