1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "jingle/glue/channel_socket_adapter.h"
6
7 #include <limits>
8
9 #include "base/callback.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop.h"
12 #include "net/base/io_buffer.h"
13 #include "net/base/net_errors.h"
14 #include "third_party/libjingle/source/talk/p2p/base/transportchannel.h"
15
16 namespace jingle_glue {
17
TransportChannelSocketAdapter(cricket::TransportChannel * channel)18 TransportChannelSocketAdapter::TransportChannelSocketAdapter(
19 cricket::TransportChannel* channel)
20 : message_loop_(base::MessageLoop::current()),
21 channel_(channel),
22 closed_error_code_(net::OK) {
23 DCHECK(channel_);
24
25 channel_->SignalReadPacket.connect(
26 this, &TransportChannelSocketAdapter::OnNewPacket);
27 channel_->SignalWritableState.connect(
28 this, &TransportChannelSocketAdapter::OnWritableState);
29 channel_->SignalDestroyed.connect(
30 this, &TransportChannelSocketAdapter::OnChannelDestroyed);
31 }
32
~TransportChannelSocketAdapter()33 TransportChannelSocketAdapter::~TransportChannelSocketAdapter() {
34 if (!destruction_callback_.is_null())
35 destruction_callback_.Run();
36 }
37
SetOnDestroyedCallback(const base::Closure & callback)38 void TransportChannelSocketAdapter::SetOnDestroyedCallback(
39 const base::Closure& callback) {
40 destruction_callback_ = callback;
41 }
42
Read(net::IOBuffer * buf,int buffer_size,const net::CompletionCallback & callback)43 int TransportChannelSocketAdapter::Read(
44 net::IOBuffer* buf,
45 int buffer_size,
46 const net::CompletionCallback& callback) {
47 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
48 DCHECK(buf);
49 DCHECK(!callback.is_null());
50 CHECK(read_callback_.is_null());
51
52 if (!channel_) {
53 DCHECK(closed_error_code_ != net::OK);
54 return closed_error_code_;
55 }
56
57 read_callback_ = callback;
58 read_buffer_ = buf;
59 read_buffer_size_ = buffer_size;
60
61 return net::ERR_IO_PENDING;
62 }
63
Write(net::IOBuffer * buffer,int buffer_size,const net::CompletionCallback & callback)64 int TransportChannelSocketAdapter::Write(
65 net::IOBuffer* buffer,
66 int buffer_size,
67 const net::CompletionCallback& callback) {
68 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
69 DCHECK(buffer);
70 DCHECK(!callback.is_null());
71 CHECK(write_callback_.is_null());
72
73 if (!channel_) {
74 DCHECK(closed_error_code_ != net::OK);
75 return closed_error_code_;
76 }
77
78 int result;
79 if (channel_->writable()) {
80 result = channel_->SendPacket(buffer->data(), buffer_size,
81 talk_base::DSCP_NO_CHANGE);
82 if (result < 0) {
83 result = net::MapSystemError(channel_->GetError());
84
85 // If the underlying socket returns IO pending where it shouldn't we
86 // pretend the packet is dropped and return as succeeded because no
87 // writeable callback will happen.
88 if (result == net::ERR_IO_PENDING)
89 result = net::OK;
90 }
91 } else {
92 // Channel is not writable yet.
93 result = net::ERR_IO_PENDING;
94 write_callback_ = callback;
95 write_buffer_ = buffer;
96 write_buffer_size_ = buffer_size;
97 }
98
99 return result;
100 }
101
SetReceiveBufferSize(int32 size)102 bool TransportChannelSocketAdapter::SetReceiveBufferSize(int32 size) {
103 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
104 return channel_->SetOption(talk_base::Socket::OPT_RCVBUF, size) == 0;
105 }
106
SetSendBufferSize(int32 size)107 bool TransportChannelSocketAdapter::SetSendBufferSize(int32 size) {
108 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
109 return channel_->SetOption(talk_base::Socket::OPT_SNDBUF, size) == 0;
110 }
111
Close(int error_code)112 void TransportChannelSocketAdapter::Close(int error_code) {
113 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
114
115 if (!channel_) // Already closed.
116 return;
117
118 DCHECK(error_code != net::OK);
119 closed_error_code_ = error_code;
120 channel_->SignalReadPacket.disconnect(this);
121 channel_->SignalDestroyed.disconnect(this);
122 channel_ = NULL;
123
124 if (!read_callback_.is_null()) {
125 net::CompletionCallback callback = read_callback_;
126 read_callback_.Reset();
127 read_buffer_ = NULL;
128 callback.Run(error_code);
129 }
130
131 if (!write_callback_.is_null()) {
132 net::CompletionCallback callback = write_callback_;
133 write_callback_.Reset();
134 write_buffer_ = NULL;
135 callback.Run(error_code);
136 }
137 }
138
OnNewPacket(cricket::TransportChannel * channel,const char * data,size_t data_size,const talk_base::PacketTime & packet_time,int flags)139 void TransportChannelSocketAdapter::OnNewPacket(
140 cricket::TransportChannel* channel,
141 const char* data,
142 size_t data_size,
143 const talk_base::PacketTime& packet_time,
144 int flags) {
145 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
146 DCHECK_EQ(channel, channel_);
147 if (!read_callback_.is_null()) {
148 DCHECK(read_buffer_.get());
149 CHECK_LT(data_size, static_cast<size_t>(std::numeric_limits<int>::max()));
150
151 if (read_buffer_size_ < static_cast<int>(data_size)) {
152 LOG(WARNING) << "Data buffer is smaller than the received packet. "
153 << "Dropping the data that doesn't fit.";
154 data_size = read_buffer_size_;
155 }
156
157 memcpy(read_buffer_->data(), data, data_size);
158
159 net::CompletionCallback callback = read_callback_;
160 read_callback_.Reset();
161 read_buffer_ = NULL;
162
163 callback.Run(data_size);
164 } else {
165 LOG(WARNING)
166 << "Data was received without a callback. Dropping the packet.";
167 }
168 }
169
OnWritableState(cricket::TransportChannel * channel)170 void TransportChannelSocketAdapter::OnWritableState(
171 cricket::TransportChannel* channel) {
172 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
173 // Try to send the packet if there is a pending write.
174 if (!write_callback_.is_null()) {
175 int result = channel_->SendPacket(write_buffer_->data(),
176 write_buffer_size_,
177 talk_base::DSCP_NO_CHANGE);
178 if (result < 0)
179 result = net::MapSystemError(channel_->GetError());
180
181 if (result != net::ERR_IO_PENDING) {
182 net::CompletionCallback callback = write_callback_;
183 write_callback_.Reset();
184 write_buffer_ = NULL;
185 callback.Run(result);
186 }
187 }
188 }
189
OnChannelDestroyed(cricket::TransportChannel * channel)190 void TransportChannelSocketAdapter::OnChannelDestroyed(
191 cricket::TransportChannel* channel) {
192 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
193 DCHECK_EQ(channel, channel_);
194 Close(net::ERR_CONNECTION_ABORTED);
195 }
196
197 } // namespace jingle_glue
198