• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2012, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 #include "talk/app/webrtc/datachannel.h"
28 
29 #include <string>
30 
31 #include "talk/app/webrtc/mediastreamprovider.h"
32 #include "talk/base/logging.h"
33 #include "talk/base/refcount.h"
34 #include "talk/media/sctp/sctputils.h"
35 
36 namespace webrtc {
37 
38 static size_t kMaxQueuedReceivedDataPackets = 100;
39 static size_t kMaxQueuedSendDataPackets = 100;
40 
41 enum {
42   MSG_CHANNELREADY,
43 };
44 
Create(DataChannelProviderInterface * provider,cricket::DataChannelType dct,const std::string & label,const DataChannelInit * config)45 talk_base::scoped_refptr<DataChannel> DataChannel::Create(
46     DataChannelProviderInterface* provider,
47     cricket::DataChannelType dct,
48     const std::string& label,
49     const DataChannelInit* config) {
50   talk_base::scoped_refptr<DataChannel> channel(
51       new talk_base::RefCountedObject<DataChannel>(provider, dct, label));
52   if (!channel->Init(config)) {
53     return NULL;
54   }
55   return channel;
56 }
57 
DataChannel(DataChannelProviderInterface * provider,cricket::DataChannelType dct,const std::string & label)58 DataChannel::DataChannel(
59     DataChannelProviderInterface* provider,
60     cricket::DataChannelType dct,
61     const std::string& label)
62     : label_(label),
63       observer_(NULL),
64       state_(kConnecting),
65       was_ever_writable_(false),
66       connected_to_provider_(false),
67       data_channel_type_(dct),
68       provider_(provider),
69       send_ssrc_set_(false),
70       send_ssrc_(0),
71       receive_ssrc_set_(false),
72       receive_ssrc_(0) {
73 }
74 
Init(const DataChannelInit * config)75 bool DataChannel::Init(const DataChannelInit* config) {
76   if (data_channel_type_ == cricket::DCT_RTP &&
77       (config->reliable ||
78        config->id != -1 ||
79        config->maxRetransmits != -1 ||
80        config->maxRetransmitTime != -1)) {
81     LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
82                   << "invalid DataChannelInit.";
83     return false;
84   } else if (data_channel_type_ == cricket::DCT_SCTP) {
85     if (config->id < -1 ||
86         config->maxRetransmits < -1 ||
87         config->maxRetransmitTime < -1) {
88       LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
89                     << "invalid DataChannelInit.";
90       return false;
91     }
92     if (config->maxRetransmits != -1 && config->maxRetransmitTime != -1) {
93       LOG(LS_ERROR) <<
94           "maxRetransmits and maxRetransmitTime should not be both set.";
95       return false;
96     }
97     config_ = *config;
98 
99     // Try to connect to the transport in case the transport channel already
100     // exists.
101     OnTransportChannelCreated();
102 
103     // Checks if the transport is ready to send because the initial channel
104     // ready signal may have been sent before the DataChannel creation.
105     // This has to be done async because the upper layer objects (e.g.
106     // Chrome glue and WebKit) are not wired up properly until after this
107     // function returns.
108     if (provider_->ReadyToSendData()) {
109       talk_base::Thread::Current()->Post(this, MSG_CHANNELREADY, NULL);
110     }
111   }
112 
113   return true;
114 }
115 
~DataChannel()116 DataChannel::~DataChannel() {
117   ClearQueuedReceivedData();
118   ClearQueuedSendData();
119   ClearQueuedControlData();
120 }
121 
RegisterObserver(DataChannelObserver * observer)122 void DataChannel::RegisterObserver(DataChannelObserver* observer) {
123   observer_ = observer;
124   DeliverQueuedReceivedData();
125 }
126 
UnregisterObserver()127 void DataChannel::UnregisterObserver() {
128   observer_ = NULL;
129 }
130 
reliable() const131 bool DataChannel::reliable() const {
132   if (data_channel_type_ == cricket::DCT_RTP) {
133     return false;
134   } else {
135     return config_.maxRetransmits == -1 &&
136            config_.maxRetransmitTime == -1;
137   }
138 }
139 
buffered_amount() const140 uint64 DataChannel::buffered_amount() const {
141   uint64 buffered_amount = 0;
142   for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
143       it != queued_send_data_.end();
144       ++it) {
145     buffered_amount += (*it)->size();
146   }
147   return buffered_amount;
148 }
149 
Close()150 void DataChannel::Close() {
151   if (state_ == kClosed)
152     return;
153   send_ssrc_ = 0;
154   send_ssrc_set_ = false;
155   SetState(kClosing);
156   UpdateState();
157 }
158 
Send(const DataBuffer & buffer)159 bool DataChannel::Send(const DataBuffer& buffer) {
160   if (state_ != kOpen) {
161     return false;
162   }
163   // If the queue is non-empty, we're waiting for SignalReadyToSend,
164   // so just add to the end of the queue and keep waiting.
165   if (!queued_send_data_.empty()) {
166     return QueueSendData(buffer);
167   }
168 
169   cricket::SendDataResult send_result;
170   if (!InternalSendWithoutQueueing(buffer, &send_result)) {
171     if (send_result == cricket::SDR_BLOCK) {
172       return QueueSendData(buffer);
173     }
174     // Fail for other results.
175     // TODO(jiayl): We should close the data channel in this case.
176     return false;
177   }
178   return true;
179 }
180 
QueueControl(const talk_base::Buffer * buffer)181 void DataChannel::QueueControl(const talk_base::Buffer* buffer) {
182   queued_control_data_.push(buffer);
183 }
184 
SendOpenMessage(const talk_base::Buffer * raw_buffer)185 bool DataChannel::SendOpenMessage(const talk_base::Buffer* raw_buffer) {
186   ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
187          was_ever_writable_ &&
188          config_.id >= 0 &&
189          !config_.negotiated);
190 
191   talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer);
192 
193   cricket::SendDataParams send_params;
194   send_params.ssrc = config_.id;
195   send_params.ordered = true;
196   send_params.type = cricket::DMT_CONTROL;
197 
198   cricket::SendDataResult send_result;
199   bool retval = provider_->SendData(send_params, *buffer, &send_result);
200   if (!retval && send_result == cricket::SDR_BLOCK) {
201     // Link is congested.  Queue for later.
202     QueueControl(buffer.release());
203   }
204   return retval;
205 }
206 
SetReceiveSsrc(uint32 receive_ssrc)207 void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
208   ASSERT(data_channel_type_ == cricket::DCT_RTP);
209 
210   if (receive_ssrc_set_) {
211     return;
212   }
213   receive_ssrc_ = receive_ssrc;
214   receive_ssrc_set_ = true;
215   UpdateState();
216 }
217 
218 // The remote peer request that this channel shall be closed.
RemotePeerRequestClose()219 void DataChannel::RemotePeerRequestClose() {
220   DoClose();
221 }
222 
SetSendSsrc(uint32 send_ssrc)223 void DataChannel::SetSendSsrc(uint32 send_ssrc) {
224   ASSERT(data_channel_type_ == cricket::DCT_RTP);
225   if (send_ssrc_set_) {
226     return;
227   }
228   send_ssrc_ = send_ssrc;
229   send_ssrc_set_ = true;
230   UpdateState();
231 }
232 
OnMessage(talk_base::Message * msg)233 void DataChannel::OnMessage(talk_base::Message* msg) {
234   switch (msg->message_id) {
235     case MSG_CHANNELREADY:
236       OnChannelReady(true);
237       break;
238   }
239 }
240 
241 // The underlaying data engine is closing.
242 // This function makes sure the DataChannel is disconnected and changes state to
243 // kClosed.
OnDataEngineClose()244 void DataChannel::OnDataEngineClose() {
245   DoClose();
246 }
247 
OnDataReceived(cricket::DataChannel * channel,const cricket::ReceiveDataParams & params,const talk_base::Buffer & payload)248 void DataChannel::OnDataReceived(cricket::DataChannel* channel,
249                                  const cricket::ReceiveDataParams& params,
250                                  const talk_base::Buffer& payload) {
251   uint32 expected_ssrc =
252       (data_channel_type_ == cricket::DCT_RTP) ? receive_ssrc_ : config_.id;
253   if (params.ssrc != expected_ssrc) {
254     return;
255   }
256 
257   bool binary = (params.type == cricket::DMT_BINARY);
258   talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
259   if (was_ever_writable_ && observer_) {
260     observer_->OnMessage(*buffer.get());
261   } else {
262     if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
263       // TODO(jiayl): We should close the data channel in this case.
264       LOG(LS_ERROR)
265           << "Queued received data exceeds the max number of packes.";
266       ClearQueuedReceivedData();
267     }
268     queued_received_data_.push(buffer.release());
269   }
270 }
271 
OnChannelReady(bool writable)272 void DataChannel::OnChannelReady(bool writable) {
273   if (!writable) {
274     return;
275   }
276   // Update the readyState and send the queued control message if the channel
277   // is writable for the first time; otherwise it means the channel was blocked
278   // for sending and now unblocked, so send the queued data now.
279   if (!was_ever_writable_) {
280     was_ever_writable_ = true;
281 
282     if (data_channel_type_ == cricket::DCT_SCTP && !config_.negotiated) {
283       talk_base::Buffer* payload = new talk_base::Buffer;
284       if (!cricket::WriteDataChannelOpenMessage(label_, config_, payload)) {
285         // TODO(jiayl): close the data channel on this error.
286         LOG(LS_ERROR) << "Could not write data channel OPEN message";
287         return;
288       }
289       SendOpenMessage(payload);
290     }
291 
292     UpdateState();
293     ASSERT(queued_send_data_.empty());
294   } else if (state_ == kOpen) {
295     DeliverQueuedSendData();
296   }
297 }
298 
DoClose()299 void DataChannel::DoClose() {
300   receive_ssrc_set_ = false;
301   send_ssrc_set_ = false;
302   SetState(kClosing);
303   UpdateState();
304 }
305 
UpdateState()306 void DataChannel::UpdateState() {
307   switch (state_) {
308     case kConnecting: {
309       if (send_ssrc_set_ == receive_ssrc_set_) {
310         if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) {
311           connected_to_provider_ = provider_->ConnectDataChannel(this);
312         }
313         if (was_ever_writable_) {
314           // TODO(jiayl): Do not transition to kOpen if we failed to send the
315           // OPEN message.
316           DeliverQueuedControlData();
317           SetState(kOpen);
318           // If we have received buffers before the channel got writable.
319           // Deliver them now.
320           DeliverQueuedReceivedData();
321         }
322       }
323       break;
324     }
325     case kOpen: {
326       break;
327     }
328     case kClosing: {
329       DisconnectFromTransport();
330 
331       if (!send_ssrc_set_ && !receive_ssrc_set_) {
332         SetState(kClosed);
333       }
334       break;
335     }
336     case kClosed:
337       break;
338   }
339 }
340 
SetState(DataState state)341 void DataChannel::SetState(DataState state) {
342   state_ = state;
343   if (observer_) {
344     observer_->OnStateChange();
345   }
346 }
347 
DisconnectFromTransport()348 void DataChannel::DisconnectFromTransport() {
349   if (!connected_to_provider_)
350     return;
351 
352   provider_->DisconnectDataChannel(this);
353   connected_to_provider_ = false;
354 
355   if (data_channel_type_ == cricket::DCT_SCTP) {
356     provider_->RemoveSctpDataStream(config_.id);
357   }
358 }
359 
DeliverQueuedReceivedData()360 void DataChannel::DeliverQueuedReceivedData() {
361   if (!was_ever_writable_ || !observer_) {
362     return;
363   }
364 
365   while (!queued_received_data_.empty()) {
366     DataBuffer* buffer = queued_received_data_.front();
367     observer_->OnMessage(*buffer);
368     queued_received_data_.pop();
369     delete buffer;
370   }
371 }
372 
ClearQueuedReceivedData()373 void DataChannel::ClearQueuedReceivedData() {
374   while (!queued_received_data_.empty()) {
375     DataBuffer* buffer = queued_received_data_.front();
376     queued_received_data_.pop();
377     delete buffer;
378   }
379 }
380 
DeliverQueuedSendData()381 void DataChannel::DeliverQueuedSendData() {
382   ASSERT(was_ever_writable_ && state_ == kOpen);
383 
384   // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
385   // that the readyState is open. According to the standard, the channel should
386   // not become open before the OPEN message is sent.
387   DeliverQueuedControlData();
388 
389   while (!queued_send_data_.empty()) {
390     DataBuffer* buffer = queued_send_data_.front();
391     cricket::SendDataResult send_result;
392     if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
393       LOG(LS_WARNING) << "DeliverQueuedSendData aborted due to send_result "
394                       << send_result;
395       break;
396     }
397     queued_send_data_.pop_front();
398     delete buffer;
399   }
400 }
401 
ClearQueuedControlData()402 void DataChannel::ClearQueuedControlData() {
403   while (!queued_control_data_.empty()) {
404     const talk_base::Buffer *buf = queued_control_data_.front();
405     queued_control_data_.pop();
406     delete buf;
407   }
408 }
409 
DeliverQueuedControlData()410 void DataChannel::DeliverQueuedControlData() {
411   ASSERT(was_ever_writable_);
412   while (!queued_control_data_.empty()) {
413     const talk_base::Buffer* buf = queued_control_data_.front();
414     queued_control_data_.pop();
415     SendOpenMessage(buf);
416   }
417 }
418 
ClearQueuedSendData()419 void DataChannel::ClearQueuedSendData() {
420   while (!queued_send_data_.empty()) {
421     DataBuffer* buffer = queued_send_data_.front();
422     queued_send_data_.pop_front();
423     delete buffer;
424   }
425 }
426 
InternalSendWithoutQueueing(const DataBuffer & buffer,cricket::SendDataResult * send_result)427 bool DataChannel::InternalSendWithoutQueueing(
428     const DataBuffer& buffer, cricket::SendDataResult* send_result) {
429   cricket::SendDataParams send_params;
430 
431   if (data_channel_type_ == cricket::DCT_SCTP) {
432     send_params.ordered = config_.ordered;
433     send_params.max_rtx_count = config_.maxRetransmits;
434     send_params.max_rtx_ms = config_.maxRetransmitTime;
435     send_params.ssrc = config_.id;
436   } else {
437     send_params.ssrc = send_ssrc_;
438   }
439   send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
440 
441   return provider_->SendData(send_params, buffer.data, send_result);
442 }
443 
QueueSendData(const DataBuffer & buffer)444 bool DataChannel::QueueSendData(const DataBuffer& buffer) {
445   if (queued_send_data_.size() > kMaxQueuedSendDataPackets) {
446     LOG(LS_ERROR) << "Can't buffer any more data in the data channel.";
447     return false;
448   }
449   queued_send_data_.push_back(new DataBuffer(buffer));
450   return true;
451 }
452 
SetSctpSid(int sid)453 void DataChannel::SetSctpSid(int sid) {
454   ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP);
455   config_.id = sid;
456   provider_->AddSctpDataStream(sid);
457 }
458 
OnTransportChannelCreated()459 void DataChannel::OnTransportChannelCreated() {
460   ASSERT(data_channel_type_ == cricket::DCT_SCTP);
461   if (!connected_to_provider_) {
462     connected_to_provider_ = provider_->ConnectDataChannel(this);
463   }
464   // The sid may have been unassigned when provider_->ConnectDataChannel was
465   // done. So always add the streams even if connected_to_provider_ is true.
466   if (config_.id >= 0) {
467     provider_->AddSctpDataStream(config_.id);
468   }
469 }
470 
471 }  // namespace webrtc
472