• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2012, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 #include "talk/app/webrtc/datachannel.h"
28 
29 #include <string>
30 
31 #include "talk/app/webrtc/mediastreamprovider.h"
32 #include "talk/app/webrtc/sctputils.h"
33 #include "webrtc/base/logging.h"
34 #include "webrtc/base/refcount.h"
35 
36 namespace webrtc {
37 
38 static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024;
39 static size_t kMaxQueuedSendDataBytes = 16 * 1024 * 1024;
40 
41 enum {
42   MSG_CHANNELREADY,
43 };
44 
PacketQueue()45 DataChannel::PacketQueue::PacketQueue() : byte_count_(0) {}
46 
~PacketQueue()47 DataChannel::PacketQueue::~PacketQueue() {
48   Clear();
49 }
50 
Empty() const51 bool DataChannel::PacketQueue::Empty() const {
52   return packets_.empty();
53 }
54 
Front()55 DataBuffer* DataChannel::PacketQueue::Front() {
56   return packets_.front();
57 }
58 
Pop()59 void DataChannel::PacketQueue::Pop() {
60   if (packets_.empty()) {
61     return;
62   }
63 
64   byte_count_ -= packets_.front()->size();
65   packets_.pop_front();
66 }
67 
Push(DataBuffer * packet)68 void DataChannel::PacketQueue::Push(DataBuffer* packet) {
69   byte_count_ += packet->size();
70   packets_.push_back(packet);
71 }
72 
Clear()73 void DataChannel::PacketQueue::Clear() {
74   while (!packets_.empty()) {
75     delete packets_.front();
76     packets_.pop_front();
77   }
78   byte_count_ = 0;
79 }
80 
Swap(PacketQueue * other)81 void DataChannel::PacketQueue::Swap(PacketQueue* other) {
82   size_t other_byte_count = other->byte_count_;
83   other->byte_count_ = byte_count_;
84   byte_count_ = other_byte_count;
85 
86   other->packets_.swap(packets_);
87 }
88 
Create(DataChannelProviderInterface * provider,cricket::DataChannelType dct,const std::string & label,const InternalDataChannelInit & config)89 rtc::scoped_refptr<DataChannel> DataChannel::Create(
90     DataChannelProviderInterface* provider,
91     cricket::DataChannelType dct,
92     const std::string& label,
93     const InternalDataChannelInit& config) {
94   rtc::scoped_refptr<DataChannel> channel(
95       new rtc::RefCountedObject<DataChannel>(provider, dct, label));
96   if (!channel->Init(config)) {
97     return NULL;
98   }
99   return channel;
100 }
101 
DataChannel(DataChannelProviderInterface * provider,cricket::DataChannelType dct,const std::string & label)102 DataChannel::DataChannel(
103     DataChannelProviderInterface* provider,
104     cricket::DataChannelType dct,
105     const std::string& label)
106     : label_(label),
107       observer_(NULL),
108       state_(kConnecting),
109       data_channel_type_(dct),
110       provider_(provider),
111       waiting_for_open_ack_(false),
112       was_ever_writable_(false),
113       connected_to_provider_(false),
114       send_ssrc_set_(false),
115       receive_ssrc_set_(false),
116       send_ssrc_(0),
117       receive_ssrc_(0) {
118 }
119 
Init(const InternalDataChannelInit & config)120 bool DataChannel::Init(const InternalDataChannelInit& config) {
121   if (data_channel_type_ == cricket::DCT_RTP &&
122       (config.reliable ||
123        config.id != -1 ||
124        config.maxRetransmits != -1 ||
125        config.maxRetransmitTime != -1)) {
126     LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
127                   << "invalid DataChannelInit.";
128     return false;
129   } else if (data_channel_type_ == cricket::DCT_SCTP) {
130     if (config.id < -1 ||
131         config.maxRetransmits < -1 ||
132         config.maxRetransmitTime < -1) {
133       LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
134                     << "invalid DataChannelInit.";
135       return false;
136     }
137     if (config.maxRetransmits != -1 && config.maxRetransmitTime != -1) {
138       LOG(LS_ERROR) <<
139           "maxRetransmits and maxRetransmitTime should not be both set.";
140       return false;
141     }
142     config_ = config;
143 
144     // Try to connect to the transport in case the transport channel already
145     // exists.
146     OnTransportChannelCreated();
147 
148     // Checks if the transport is ready to send because the initial channel
149     // ready signal may have been sent before the DataChannel creation.
150     // This has to be done async because the upper layer objects (e.g.
151     // Chrome glue and WebKit) are not wired up properly until after this
152     // function returns.
153     if (provider_->ReadyToSendData()) {
154       rtc::Thread::Current()->Post(this, MSG_CHANNELREADY, NULL);
155     }
156   }
157 
158   return true;
159 }
160 
~DataChannel()161 DataChannel::~DataChannel() {}
162 
RegisterObserver(DataChannelObserver * observer)163 void DataChannel::RegisterObserver(DataChannelObserver* observer) {
164   observer_ = observer;
165   DeliverQueuedReceivedData();
166 }
167 
UnregisterObserver()168 void DataChannel::UnregisterObserver() {
169   observer_ = NULL;
170 }
171 
reliable() const172 bool DataChannel::reliable() const {
173   if (data_channel_type_ == cricket::DCT_RTP) {
174     return false;
175   } else {
176     return config_.maxRetransmits == -1 &&
177            config_.maxRetransmitTime == -1;
178   }
179 }
180 
buffered_amount() const181 uint64 DataChannel::buffered_amount() const {
182   return queued_send_data_.byte_count();
183 }
184 
Close()185 void DataChannel::Close() {
186   if (state_ == kClosed)
187     return;
188   send_ssrc_ = 0;
189   send_ssrc_set_ = false;
190   SetState(kClosing);
191   UpdateState();
192 }
193 
Send(const DataBuffer & buffer)194 bool DataChannel::Send(const DataBuffer& buffer) {
195   if (state_ != kOpen) {
196     return false;
197   }
198 
199   // TODO(jiayl): the spec is unclear about if the remote side should get the
200   // onmessage event. We need to figure out the expected behavior and change the
201   // code accordingly.
202   if (buffer.size() == 0) {
203     return true;
204   }
205 
206   // If the queue is non-empty, we're waiting for SignalReadyToSend,
207   // so just add to the end of the queue and keep waiting.
208   if (!queued_send_data_.Empty()) {
209     // Only SCTP DataChannel queues the outgoing data when the transport is
210     // blocked.
211     ASSERT(data_channel_type_ == cricket::DCT_SCTP);
212     if (!QueueSendDataMessage(buffer)) {
213       Close();
214     }
215     return true;
216   }
217 
218   bool success = SendDataMessage(buffer);
219   if (data_channel_type_ == cricket::DCT_RTP) {
220     return success;
221   }
222 
223   // Always return true for SCTP DataChannel per the spec.
224   return true;
225 }
226 
SetReceiveSsrc(uint32 receive_ssrc)227 void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
228   ASSERT(data_channel_type_ == cricket::DCT_RTP);
229 
230   if (receive_ssrc_set_) {
231     return;
232   }
233   receive_ssrc_ = receive_ssrc;
234   receive_ssrc_set_ = true;
235   UpdateState();
236 }
237 
238 // The remote peer request that this channel shall be closed.
RemotePeerRequestClose()239 void DataChannel::RemotePeerRequestClose() {
240   DoClose();
241 }
242 
SetSctpSid(int sid)243 void DataChannel::SetSctpSid(int sid) {
244   ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP);
245   if (config_.id == sid)
246     return;
247 
248   config_.id = sid;
249   provider_->AddSctpDataStream(sid);
250 }
251 
OnTransportChannelCreated()252 void DataChannel::OnTransportChannelCreated() {
253   ASSERT(data_channel_type_ == cricket::DCT_SCTP);
254   if (!connected_to_provider_) {
255     connected_to_provider_ = provider_->ConnectDataChannel(this);
256   }
257   // The sid may have been unassigned when provider_->ConnectDataChannel was
258   // done. So always add the streams even if connected_to_provider_ is true.
259   if (config_.id >= 0) {
260     provider_->AddSctpDataStream(config_.id);
261   }
262 }
263 
SetSendSsrc(uint32 send_ssrc)264 void DataChannel::SetSendSsrc(uint32 send_ssrc) {
265   ASSERT(data_channel_type_ == cricket::DCT_RTP);
266   if (send_ssrc_set_) {
267     return;
268   }
269   send_ssrc_ = send_ssrc;
270   send_ssrc_set_ = true;
271   UpdateState();
272 }
273 
OnMessage(rtc::Message * msg)274 void DataChannel::OnMessage(rtc::Message* msg) {
275   switch (msg->message_id) {
276     case MSG_CHANNELREADY:
277       OnChannelReady(true);
278       break;
279   }
280 }
281 
282 // The underlaying data engine is closing.
283 // This function makes sure the DataChannel is disconnected and changes state to
284 // kClosed.
OnDataEngineClose()285 void DataChannel::OnDataEngineClose() {
286   DoClose();
287 }
288 
OnDataReceived(cricket::DataChannel * channel,const cricket::ReceiveDataParams & params,const rtc::Buffer & payload)289 void DataChannel::OnDataReceived(cricket::DataChannel* channel,
290                                  const cricket::ReceiveDataParams& params,
291                                  const rtc::Buffer& payload) {
292   uint32 expected_ssrc =
293       (data_channel_type_ == cricket::DCT_RTP) ? receive_ssrc_ : config_.id;
294   if (params.ssrc != expected_ssrc) {
295     return;
296   }
297 
298   if (params.type == cricket::DMT_CONTROL) {
299     ASSERT(data_channel_type_ == cricket::DCT_SCTP);
300     if (!waiting_for_open_ack_) {
301       // Ignore it if we are not expecting an ACK message.
302       LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, "
303                       << "sid = " << params.ssrc;
304       return;
305     }
306     if (ParseDataChannelOpenAckMessage(payload)) {
307       // We can send unordered as soon as we receive the ACK message.
308       waiting_for_open_ack_ = false;
309       LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
310                    << params.ssrc;
311     } else {
312       LOG(LS_WARNING) << "DataChannel failed to parse OPEN_ACK message, sid = "
313                       << params.ssrc;
314     }
315     return;
316   }
317 
318   ASSERT(params.type == cricket::DMT_BINARY ||
319          params.type == cricket::DMT_TEXT);
320 
321   LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = " << params.ssrc;
322   // We can send unordered as soon as we receive any DATA message since the
323   // remote side must have received the OPEN (and old clients do not send
324   // OPEN_ACK).
325   waiting_for_open_ack_ = false;
326 
327   bool binary = (params.type == cricket::DMT_BINARY);
328   rtc::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
329   if (was_ever_writable_ && observer_) {
330     observer_->OnMessage(*buffer.get());
331   } else {
332     if (queued_received_data_.byte_count() + payload.length() >
333             kMaxQueuedReceivedDataBytes) {
334       LOG(LS_ERROR) << "Queued received data exceeds the max buffer size.";
335 
336       queued_received_data_.Clear();
337       if (data_channel_type_ != cricket::DCT_RTP) {
338         Close();
339       }
340 
341       return;
342     }
343     queued_received_data_.Push(buffer.release());
344   }
345 }
346 
OnChannelReady(bool writable)347 void DataChannel::OnChannelReady(bool writable) {
348   if (!writable) {
349     return;
350   }
351   // Update the readyState and send the queued control message if the channel
352   // is writable for the first time; otherwise it means the channel was blocked
353   // for sending and now unblocked, so send the queued data now.
354   if (!was_ever_writable_) {
355     was_ever_writable_ = true;
356 
357     if (data_channel_type_ == cricket::DCT_SCTP) {
358       rtc::Buffer payload;
359 
360       if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
361         WriteDataChannelOpenMessage(label_, config_, &payload);
362         SendControlMessage(payload);
363       } else if (config_.open_handshake_role ==
364                      InternalDataChannelInit::kAcker) {
365         WriteDataChannelOpenAckMessage(&payload);
366         SendControlMessage(payload);
367       }
368     }
369 
370     UpdateState();
371     ASSERT(queued_send_data_.Empty());
372   } else if (state_ == kOpen) {
373     // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
374     // that the readyState is open. According to the standard, the channel
375     // should not become open before the OPEN message is sent.
376     SendQueuedControlMessages();
377 
378     SendQueuedDataMessages();
379   }
380 }
381 
DoClose()382 void DataChannel::DoClose() {
383   if (state_ == kClosed)
384     return;
385 
386   receive_ssrc_set_ = false;
387   send_ssrc_set_ = false;
388   SetState(kClosing);
389   UpdateState();
390 }
391 
UpdateState()392 void DataChannel::UpdateState() {
393   switch (state_) {
394     case kConnecting: {
395       if (send_ssrc_set_ == receive_ssrc_set_) {
396         if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) {
397           connected_to_provider_ = provider_->ConnectDataChannel(this);
398         }
399         if (was_ever_writable_) {
400           // TODO(jiayl): Do not transition to kOpen if we failed to send the
401           // OPEN message.
402           SendQueuedControlMessages();
403           SetState(kOpen);
404           // If we have received buffers before the channel got writable.
405           // Deliver them now.
406           DeliverQueuedReceivedData();
407         }
408       }
409       break;
410     }
411     case kOpen: {
412       break;
413     }
414     case kClosing: {
415       DisconnectFromTransport();
416 
417       if (!send_ssrc_set_ && !receive_ssrc_set_) {
418         SetState(kClosed);
419       }
420       break;
421     }
422     case kClosed:
423       break;
424   }
425 }
426 
SetState(DataState state)427 void DataChannel::SetState(DataState state) {
428   if (state_ == state)
429     return;
430 
431   state_ = state;
432   if (observer_) {
433     observer_->OnStateChange();
434   }
435 }
436 
DisconnectFromTransport()437 void DataChannel::DisconnectFromTransport() {
438   if (!connected_to_provider_)
439     return;
440 
441   provider_->DisconnectDataChannel(this);
442   connected_to_provider_ = false;
443 
444   if (data_channel_type_ == cricket::DCT_SCTP) {
445     provider_->RemoveSctpDataStream(config_.id);
446   }
447 }
448 
DeliverQueuedReceivedData()449 void DataChannel::DeliverQueuedReceivedData() {
450   if (!was_ever_writable_ || !observer_) {
451     return;
452   }
453 
454   while (!queued_received_data_.Empty()) {
455     rtc::scoped_ptr<DataBuffer> buffer(queued_received_data_.Front());
456     observer_->OnMessage(*buffer);
457     queued_received_data_.Pop();
458   }
459 }
460 
SendQueuedDataMessages()461 void DataChannel::SendQueuedDataMessages() {
462   ASSERT(was_ever_writable_ && state_ == kOpen);
463 
464   PacketQueue packet_buffer;
465   packet_buffer.Swap(&queued_send_data_);
466 
467   while (!packet_buffer.Empty()) {
468     rtc::scoped_ptr<DataBuffer> buffer(packet_buffer.Front());
469     SendDataMessage(*buffer);
470     packet_buffer.Pop();
471   }
472 }
473 
SendDataMessage(const DataBuffer & buffer)474 bool DataChannel::SendDataMessage(const DataBuffer& buffer) {
475   cricket::SendDataParams send_params;
476 
477   if (data_channel_type_ == cricket::DCT_SCTP) {
478     send_params.ordered = config_.ordered;
479     // Send as ordered if it is waiting for the OPEN_ACK message.
480     if (waiting_for_open_ack_ && !config_.ordered) {
481       send_params.ordered = true;
482       LOG(LS_VERBOSE) << "Sending data as ordered for unordered DataChannel "
483                       << "because the OPEN_ACK message has not been received.";
484     }
485 
486     send_params.max_rtx_count = config_.maxRetransmits;
487     send_params.max_rtx_ms = config_.maxRetransmitTime;
488     send_params.ssrc = config_.id;
489   } else {
490     send_params.ssrc = send_ssrc_;
491   }
492   send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
493 
494   cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
495   bool success = provider_->SendData(send_params, buffer.data, &send_result);
496 
497   if (!success && data_channel_type_ == cricket::DCT_SCTP) {
498     if (send_result != cricket::SDR_BLOCK || !QueueSendDataMessage(buffer)) {
499       LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
500                     << "send_result = " << send_result;
501       Close();
502     }
503   }
504   return success;
505 }
506 
QueueSendDataMessage(const DataBuffer & buffer)507 bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
508   if (queued_send_data_.byte_count() >= kMaxQueuedSendDataBytes) {
509     LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
510     return false;
511   }
512   queued_send_data_.Push(new DataBuffer(buffer));
513   return true;
514 }
515 
SendQueuedControlMessages()516 void DataChannel::SendQueuedControlMessages() {
517   ASSERT(was_ever_writable_);
518 
519   PacketQueue control_packets;
520   control_packets.Swap(&queued_control_data_);
521 
522   while (!control_packets.Empty()) {
523     rtc::scoped_ptr<DataBuffer> buf(control_packets.Front());
524     SendControlMessage(buf->data);
525     control_packets.Pop();
526   }
527 }
528 
QueueControlMessage(const rtc::Buffer & buffer)529 void DataChannel::QueueControlMessage(const rtc::Buffer& buffer) {
530   queued_control_data_.Push(new DataBuffer(buffer, true));
531 }
532 
SendControlMessage(const rtc::Buffer & buffer)533 bool DataChannel::SendControlMessage(const rtc::Buffer& buffer) {
534   bool is_open_message =
535       (config_.open_handshake_role == InternalDataChannelInit::kOpener);
536 
537   ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
538          was_ever_writable_ &&
539          config_.id >= 0 &&
540          (!is_open_message || !config_.negotiated));
541 
542   cricket::SendDataParams send_params;
543   send_params.ssrc = config_.id;
544   send_params.ordered = config_.ordered || is_open_message;
545   send_params.type = cricket::DMT_CONTROL;
546 
547   cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
548   bool retval = provider_->SendData(send_params, buffer, &send_result);
549   if (retval) {
550     LOG(LS_INFO) << "Sent CONTROL message on channel " << config_.id;
551 
552     if (is_open_message) {
553       // Send data as ordered before we receive any message from the remote peer
554       // to make sure the remote peer will not receive any data before it
555       // receives the OPEN message.
556       waiting_for_open_ack_ = true;
557     }
558   } else if (send_result == cricket::SDR_BLOCK) {
559     QueueControlMessage(buffer);
560   } else {
561     LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
562                   << " the CONTROL message, send_result = " << send_result;
563     Close();
564   }
565   return retval;
566 }
567 
568 }  // namespace webrtc
569