• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "jingle/glue/channel_socket_adapter.h"
6 
7 #include <limits>
8 
9 #include "base/callback.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop.h"
12 #include "net/base/io_buffer.h"
13 #include "net/base/net_errors.h"
14 #include "third_party/libjingle/source/talk/p2p/base/transportchannel.h"
15 
16 namespace jingle_glue {
17 
TransportChannelSocketAdapter(cricket::TransportChannel * channel)18 TransportChannelSocketAdapter::TransportChannelSocketAdapter(
19     cricket::TransportChannel* channel)
20     : message_loop_(base::MessageLoop::current()),
21       channel_(channel),
22       closed_error_code_(net::OK) {
23   DCHECK(channel_);
24 
25   channel_->SignalReadPacket.connect(
26       this, &TransportChannelSocketAdapter::OnNewPacket);
27   channel_->SignalWritableState.connect(
28       this, &TransportChannelSocketAdapter::OnWritableState);
29   channel_->SignalDestroyed.connect(
30       this, &TransportChannelSocketAdapter::OnChannelDestroyed);
31 }
32 
~TransportChannelSocketAdapter()33 TransportChannelSocketAdapter::~TransportChannelSocketAdapter() {
34   if (!destruction_callback_.is_null())
35     destruction_callback_.Run();
36 }
37 
SetOnDestroyedCallback(const base::Closure & callback)38 void TransportChannelSocketAdapter::SetOnDestroyedCallback(
39     const base::Closure& callback) {
40   destruction_callback_ = callback;
41 }
42 
Read(net::IOBuffer * buf,int buffer_size,const net::CompletionCallback & callback)43 int TransportChannelSocketAdapter::Read(
44     net::IOBuffer* buf,
45     int buffer_size,
46     const net::CompletionCallback& callback) {
47   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
48   DCHECK(buf);
49   DCHECK(!callback.is_null());
50   CHECK(read_callback_.is_null());
51 
52   if (!channel_) {
53     DCHECK(closed_error_code_ != net::OK);
54     return closed_error_code_;
55   }
56 
57   read_callback_ = callback;
58   read_buffer_ = buf;
59   read_buffer_size_ = buffer_size;
60 
61   return net::ERR_IO_PENDING;
62 }
63 
Write(net::IOBuffer * buffer,int buffer_size,const net::CompletionCallback & callback)64 int TransportChannelSocketAdapter::Write(
65     net::IOBuffer* buffer,
66     int buffer_size,
67     const net::CompletionCallback& callback) {
68   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
69   DCHECK(buffer);
70   DCHECK(!callback.is_null());
71   CHECK(write_callback_.is_null());
72 
73   if (!channel_) {
74     DCHECK(closed_error_code_ != net::OK);
75     return closed_error_code_;
76   }
77 
78   int result;
79   if (channel_->writable()) {
80     result = channel_->SendPacket(buffer->data(), buffer_size,
81                                   talk_base::DSCP_NO_CHANGE);
82     if (result < 0) {
83       result = net::MapSystemError(channel_->GetError());
84 
85       // If the underlying socket returns IO pending where it shouldn't we
86       // pretend the packet is dropped and return as succeeded because no
87       // writeable callback will happen.
88       if (result == net::ERR_IO_PENDING)
89         result = net::OK;
90     }
91   } else {
92     // Channel is not writable yet.
93     result = net::ERR_IO_PENDING;
94     write_callback_ = callback;
95     write_buffer_ = buffer;
96     write_buffer_size_ = buffer_size;
97   }
98 
99   return result;
100 }
101 
SetReceiveBufferSize(int32 size)102 bool TransportChannelSocketAdapter::SetReceiveBufferSize(int32 size) {
103   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
104   return channel_->SetOption(talk_base::Socket::OPT_RCVBUF, size) == 0;
105 }
106 
SetSendBufferSize(int32 size)107 bool TransportChannelSocketAdapter::SetSendBufferSize(int32 size) {
108   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
109   return channel_->SetOption(talk_base::Socket::OPT_SNDBUF, size) == 0;
110 }
111 
Close(int error_code)112 void TransportChannelSocketAdapter::Close(int error_code) {
113   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
114 
115   if (!channel_)  // Already closed.
116     return;
117 
118   DCHECK(error_code != net::OK);
119   closed_error_code_ = error_code;
120   channel_->SignalReadPacket.disconnect(this);
121   channel_->SignalDestroyed.disconnect(this);
122   channel_ = NULL;
123 
124   if (!read_callback_.is_null()) {
125     net::CompletionCallback callback = read_callback_;
126     read_callback_.Reset();
127     read_buffer_ = NULL;
128     callback.Run(error_code);
129   }
130 
131   if (!write_callback_.is_null()) {
132     net::CompletionCallback callback = write_callback_;
133     write_callback_.Reset();
134     write_buffer_ = NULL;
135     callback.Run(error_code);
136   }
137 }
138 
OnNewPacket(cricket::TransportChannel * channel,const char * data,size_t data_size,const talk_base::PacketTime & packet_time,int flags)139 void TransportChannelSocketAdapter::OnNewPacket(
140     cricket::TransportChannel* channel,
141     const char* data,
142     size_t data_size,
143     const talk_base::PacketTime& packet_time,
144     int flags) {
145   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
146   DCHECK_EQ(channel, channel_);
147   if (!read_callback_.is_null()) {
148     DCHECK(read_buffer_.get());
149     CHECK_LT(data_size, static_cast<size_t>(std::numeric_limits<int>::max()));
150 
151     if (read_buffer_size_ < static_cast<int>(data_size)) {
152       LOG(WARNING) << "Data buffer is smaller than the received packet. "
153                    << "Dropping the data that doesn't fit.";
154       data_size = read_buffer_size_;
155     }
156 
157     memcpy(read_buffer_->data(), data, data_size);
158 
159     net::CompletionCallback callback = read_callback_;
160     read_callback_.Reset();
161     read_buffer_ = NULL;
162 
163     callback.Run(data_size);
164   } else {
165     LOG(WARNING)
166         << "Data was received without a callback. Dropping the packet.";
167   }
168 }
169 
OnWritableState(cricket::TransportChannel * channel)170 void TransportChannelSocketAdapter::OnWritableState(
171     cricket::TransportChannel* channel) {
172   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
173   // Try to send the packet if there is a pending write.
174   if (!write_callback_.is_null()) {
175     int result = channel_->SendPacket(write_buffer_->data(),
176                                       write_buffer_size_,
177                                       talk_base::DSCP_NO_CHANGE);
178     if (result < 0)
179       result = net::MapSystemError(channel_->GetError());
180 
181     if (result != net::ERR_IO_PENDING) {
182       net::CompletionCallback callback = write_callback_;
183       write_callback_.Reset();
184       write_buffer_ = NULL;
185       callback.Run(result);
186     }
187   }
188 }
189 
OnChannelDestroyed(cricket::TransportChannel * channel)190 void TransportChannelSocketAdapter::OnChannelDestroyed(
191     cricket::TransportChannel* channel) {
192   DCHECK_EQ(base::MessageLoop::current(), message_loop_);
193   DCHECK_EQ(channel, channel_);
194   Close(net::ERR_CONNECTION_ABORTED);
195 }
196 
197 }  // namespace jingle_glue
198