• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "pc/data_channel_controller.h"
12 
13 #include <utility>
14 
15 #include "api/peer_connection_interface.h"
16 #include "api/rtc_error.h"
17 #include "pc/peer_connection_internal.h"
18 #include "pc/sctp_utils.h"
19 #include "rtc_base/logging.h"
20 
21 namespace webrtc {
22 
~DataChannelController()23 DataChannelController::~DataChannelController() {
24   // Since channels may have multiple owners, we cannot guarantee that
25   // they will be deallocated before destroying the controller.
26   // Therefore, detach them from the controller.
27   for (auto channel : sctp_data_channels_) {
28     channel->DetachFromController();
29   }
30 }
31 
HasDataChannels() const32 bool DataChannelController::HasDataChannels() const {
33   RTC_DCHECK_RUN_ON(signaling_thread());
34   return !sctp_data_channels_.empty();
35 }
36 
SendData(int sid,const SendDataParams & params,const rtc::CopyOnWriteBuffer & payload,cricket::SendDataResult * result)37 bool DataChannelController::SendData(int sid,
38                                      const SendDataParams& params,
39                                      const rtc::CopyOnWriteBuffer& payload,
40                                      cricket::SendDataResult* result) {
41   if (data_channel_transport())
42     return DataChannelSendData(sid, params, payload, result);
43   RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
44   return false;
45 }
46 
ConnectDataChannel(SctpDataChannel * webrtc_data_channel)47 bool DataChannelController::ConnectDataChannel(
48     SctpDataChannel* webrtc_data_channel) {
49   RTC_DCHECK_RUN_ON(signaling_thread());
50   if (!data_channel_transport()) {
51     // Don't log an error here, because DataChannels are expected to call
52     // ConnectDataChannel in this state. It's the only way to initially tell
53     // whether or not the underlying transport is ready.
54     return false;
55   }
56   SignalDataChannelTransportWritable_s.connect(
57       webrtc_data_channel, &SctpDataChannel::OnTransportReady);
58   SignalDataChannelTransportReceivedData_s.connect(
59       webrtc_data_channel, &SctpDataChannel::OnDataReceived);
60   SignalDataChannelTransportChannelClosing_s.connect(
61       webrtc_data_channel, &SctpDataChannel::OnClosingProcedureStartedRemotely);
62   SignalDataChannelTransportChannelClosed_s.connect(
63       webrtc_data_channel, &SctpDataChannel::OnClosingProcedureComplete);
64   return true;
65 }
66 
DisconnectDataChannel(SctpDataChannel * webrtc_data_channel)67 void DataChannelController::DisconnectDataChannel(
68     SctpDataChannel* webrtc_data_channel) {
69   RTC_DCHECK_RUN_ON(signaling_thread());
70   if (!data_channel_transport()) {
71     RTC_LOG(LS_ERROR)
72         << "DisconnectDataChannel called when sctp_transport_ is NULL.";
73     return;
74   }
75   SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel);
76   SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel);
77   SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel);
78   SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel);
79 }
80 
AddSctpDataStream(int sid)81 void DataChannelController::AddSctpDataStream(int sid) {
82   if (data_channel_transport()) {
83     network_thread()->BlockingCall([this, sid] {
84       if (data_channel_transport()) {
85         data_channel_transport()->OpenChannel(sid);
86       }
87     });
88   }
89 }
90 
RemoveSctpDataStream(int sid)91 void DataChannelController::RemoveSctpDataStream(int sid) {
92   if (data_channel_transport()) {
93     network_thread()->BlockingCall([this, sid] {
94       if (data_channel_transport()) {
95         data_channel_transport()->CloseChannel(sid);
96       }
97     });
98   }
99 }
100 
ReadyToSendData() const101 bool DataChannelController::ReadyToSendData() const {
102   RTC_DCHECK_RUN_ON(signaling_thread());
103   return (data_channel_transport() && data_channel_transport_ready_to_send_);
104 }
105 
OnDataReceived(int channel_id,DataMessageType type,const rtc::CopyOnWriteBuffer & buffer)106 void DataChannelController::OnDataReceived(
107     int channel_id,
108     DataMessageType type,
109     const rtc::CopyOnWriteBuffer& buffer) {
110   RTC_DCHECK_RUN_ON(network_thread());
111   cricket::ReceiveDataParams params;
112   params.sid = channel_id;
113   params.type = type;
114   signaling_thread()->PostTask(
115       [self = weak_factory_.GetWeakPtr(), params, buffer] {
116         if (self) {
117           RTC_DCHECK_RUN_ON(self->signaling_thread());
118           // TODO(bugs.webrtc.org/11547): The data being received should be
119           // delivered on the network thread. The way HandleOpenMessage_s works
120           // right now is that it's called for all types of buffers and operates
121           // as a selector function. Change this so that it's only called for
122           // buffers that it should be able to handle. Once we do that, we can
123           // deliver all other buffers on the network thread (change
124           // SignalDataChannelTransportReceivedData_s to
125           // SignalDataChannelTransportReceivedData_n).
126           if (!self->HandleOpenMessage_s(params, buffer)) {
127             self->SignalDataChannelTransportReceivedData_s(params, buffer);
128           }
129         }
130       });
131 }
132 
OnChannelClosing(int channel_id)133 void DataChannelController::OnChannelClosing(int channel_id) {
134   RTC_DCHECK_RUN_ON(network_thread());
135   signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] {
136     if (self) {
137       RTC_DCHECK_RUN_ON(self->signaling_thread());
138       self->SignalDataChannelTransportChannelClosing_s(channel_id);
139     }
140   });
141 }
142 
OnChannelClosed(int channel_id)143 void DataChannelController::OnChannelClosed(int channel_id) {
144   RTC_DCHECK_RUN_ON(network_thread());
145   signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] {
146     if (self) {
147       RTC_DCHECK_RUN_ON(self->signaling_thread());
148       self->SignalDataChannelTransportChannelClosed_s(channel_id);
149     }
150   });
151 }
152 
OnReadyToSend()153 void DataChannelController::OnReadyToSend() {
154   RTC_DCHECK_RUN_ON(network_thread());
155   signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
156     if (self) {
157       RTC_DCHECK_RUN_ON(self->signaling_thread());
158       self->data_channel_transport_ready_to_send_ = true;
159       self->SignalDataChannelTransportWritable_s(
160           self->data_channel_transport_ready_to_send_);
161     }
162   });
163 }
164 
OnTransportClosed(RTCError error)165 void DataChannelController::OnTransportClosed(RTCError error) {
166   RTC_DCHECK_RUN_ON(network_thread());
167   signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), error] {
168     if (self) {
169       RTC_DCHECK_RUN_ON(self->signaling_thread());
170       self->OnTransportChannelClosed(error);
171     }
172   });
173 }
174 
SetupDataChannelTransport_n()175 void DataChannelController::SetupDataChannelTransport_n() {
176   RTC_DCHECK_RUN_ON(network_thread());
177 
178   // There's a new data channel transport.  This needs to be signaled to the
179   // `sctp_data_channels_` so that they can reopen and reconnect.  This is
180   // necessary when bundling is applied.
181   NotifyDataChannelsOfTransportCreated();
182 }
183 
TeardownDataChannelTransport_n()184 void DataChannelController::TeardownDataChannelTransport_n() {
185   RTC_DCHECK_RUN_ON(network_thread());
186   if (data_channel_transport()) {
187     data_channel_transport()->SetDataSink(nullptr);
188   }
189   set_data_channel_transport(nullptr);
190 }
191 
OnTransportChanged(DataChannelTransportInterface * new_data_channel_transport)192 void DataChannelController::OnTransportChanged(
193     DataChannelTransportInterface* new_data_channel_transport) {
194   RTC_DCHECK_RUN_ON(network_thread());
195   if (data_channel_transport() &&
196       data_channel_transport() != new_data_channel_transport) {
197     // Changed which data channel transport is used for `sctp_mid_` (eg. now
198     // it's bundled).
199     data_channel_transport()->SetDataSink(nullptr);
200     set_data_channel_transport(new_data_channel_transport);
201     if (new_data_channel_transport) {
202       new_data_channel_transport->SetDataSink(this);
203 
204       // There's a new data channel transport.  This needs to be signaled to the
205       // `sctp_data_channels_` so that they can reopen and reconnect.  This is
206       // necessary when bundling is applied.
207       NotifyDataChannelsOfTransportCreated();
208     }
209   }
210 }
211 
GetDataChannelStats() const212 std::vector<DataChannelStats> DataChannelController::GetDataChannelStats()
213     const {
214   RTC_DCHECK_RUN_ON(signaling_thread());
215   std::vector<DataChannelStats> stats;
216   stats.reserve(sctp_data_channels_.size());
217   for (const auto& channel : sctp_data_channels_)
218     stats.push_back(channel->GetStats());
219   return stats;
220 }
221 
HandleOpenMessage_s(const cricket::ReceiveDataParams & params,const rtc::CopyOnWriteBuffer & buffer)222 bool DataChannelController::HandleOpenMessage_s(
223     const cricket::ReceiveDataParams& params,
224     const rtc::CopyOnWriteBuffer& buffer) {
225   if (params.type == DataMessageType::kControl && IsOpenMessage(buffer)) {
226     // Received OPEN message; parse and signal that a new data channel should
227     // be created.
228     std::string label;
229     InternalDataChannelInit config;
230     config.id = params.sid;
231     if (!ParseDataChannelOpenMessage(buffer, &label, &config)) {
232       RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for sid "
233                           << params.sid;
234       return true;
235     }
236     config.open_handshake_role = InternalDataChannelInit::kAcker;
237     OnDataChannelOpenMessage(label, config);
238     return true;
239   }
240   return false;
241 }
242 
OnDataChannelOpenMessage(const std::string & label,const InternalDataChannelInit & config)243 void DataChannelController::OnDataChannelOpenMessage(
244     const std::string& label,
245     const InternalDataChannelInit& config) {
246   rtc::scoped_refptr<DataChannelInterface> channel(
247       InternalCreateDataChannelWithProxy(label, &config));
248   if (!channel.get()) {
249     RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message.";
250     return;
251   }
252 
253   pc_->Observer()->OnDataChannel(std::move(channel));
254   pc_->NoteDataAddedEvent();
255 }
256 
257 rtc::scoped_refptr<DataChannelInterface>
InternalCreateDataChannelWithProxy(const std::string & label,const InternalDataChannelInit * config)258 DataChannelController::InternalCreateDataChannelWithProxy(
259     const std::string& label,
260     const InternalDataChannelInit* config) {
261   RTC_DCHECK_RUN_ON(signaling_thread());
262   if (pc_->IsClosed()) {
263     return nullptr;
264   }
265 
266   rtc::scoped_refptr<SctpDataChannel> channel =
267       InternalCreateSctpDataChannel(label, config);
268   if (channel) {
269     return SctpDataChannel::CreateProxy(channel);
270   }
271 
272   return nullptr;
273 }
274 
275 rtc::scoped_refptr<SctpDataChannel>
InternalCreateSctpDataChannel(const std::string & label,const InternalDataChannelInit * config)276 DataChannelController::InternalCreateSctpDataChannel(
277     const std::string& label,
278     const InternalDataChannelInit* config) {
279   RTC_DCHECK_RUN_ON(signaling_thread());
280   InternalDataChannelInit new_config =
281       config ? (*config) : InternalDataChannelInit();
282   if (new_config.id < 0) {
283     rtc::SSLRole role;
284     if ((pc_->GetSctpSslRole(&role)) &&
285         !sid_allocator_.AllocateSid(role, &new_config.id)) {
286       RTC_LOG(LS_ERROR) << "No id can be allocated for the SCTP data channel.";
287       return nullptr;
288     }
289   } else if (!sid_allocator_.ReserveSid(new_config.id)) {
290     RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel "
291                          "because the id is already in use or out of range.";
292     return nullptr;
293   }
294   rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
295       this, label, new_config, signaling_thread(), network_thread()));
296   if (!channel) {
297     sid_allocator_.ReleaseSid(new_config.id);
298     return nullptr;
299   }
300   sctp_data_channels_.push_back(channel);
301   channel->SignalClosed.connect(
302       pc_, &PeerConnectionInternal::OnSctpDataChannelClosed);
303   SignalSctpDataChannelCreated_(channel.get());
304   return channel;
305 }
306 
AllocateSctpSids(rtc::SSLRole role)307 void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
308   RTC_DCHECK_RUN_ON(signaling_thread());
309   std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
310   for (const auto& channel : sctp_data_channels_) {
311     if (channel->id() < 0) {
312       int sid;
313       if (!sid_allocator_.AllocateSid(role, &sid)) {
314         RTC_LOG(LS_ERROR) << "Failed to allocate SCTP sid, closing channel.";
315         channels_to_close.push_back(channel);
316         continue;
317       }
318       channel->SetSctpSid(sid);
319     }
320   }
321   // Since closing modifies the list of channels, we have to do the actual
322   // closing outside the loop.
323   for (const auto& channel : channels_to_close) {
324     channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
325   }
326 }
327 
OnSctpDataChannelClosed(SctpDataChannel * channel)328 void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
329   RTC_DCHECK_RUN_ON(signaling_thread());
330   for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
331        ++it) {
332     if (it->get() == channel) {
333       if (channel->id() >= 0) {
334         // After the closing procedure is done, it's safe to use this ID for
335         // another data channel.
336         sid_allocator_.ReleaseSid(channel->id());
337       }
338       // Since this method is triggered by a signal from the DataChannel,
339       // we can't free it directly here; we need to free it asynchronously.
340       sctp_data_channels_to_free_.push_back(*it);
341       sctp_data_channels_.erase(it);
342       signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
343         if (self) {
344           RTC_DCHECK_RUN_ON(self->signaling_thread());
345           self->sctp_data_channels_to_free_.clear();
346         }
347       });
348       return;
349     }
350   }
351 }
352 
OnTransportChannelClosed(RTCError error)353 void DataChannelController::OnTransportChannelClosed(RTCError error) {
354   RTC_DCHECK_RUN_ON(signaling_thread());
355   // Use a temporary copy of the SCTP DataChannel list because the
356   // DataChannel may callback to us and try to modify the list.
357   std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs;
358   temp_sctp_dcs.swap(sctp_data_channels_);
359   for (const auto& channel : temp_sctp_dcs) {
360     channel->OnTransportChannelClosed(error);
361   }
362 }
363 
data_channel_transport() const364 DataChannelTransportInterface* DataChannelController::data_channel_transport()
365     const {
366   // TODO(bugs.webrtc.org/11547): Only allow this accessor to be called on the
367   // network thread.
368   // RTC_DCHECK_RUN_ON(network_thread());
369   return data_channel_transport_;
370 }
371 
set_data_channel_transport(DataChannelTransportInterface * transport)372 void DataChannelController::set_data_channel_transport(
373     DataChannelTransportInterface* transport) {
374   RTC_DCHECK_RUN_ON(network_thread());
375   data_channel_transport_ = transport;
376 }
377 
DataChannelSendData(int sid,const SendDataParams & params,const rtc::CopyOnWriteBuffer & payload,cricket::SendDataResult * result)378 bool DataChannelController::DataChannelSendData(
379     int sid,
380     const SendDataParams& params,
381     const rtc::CopyOnWriteBuffer& payload,
382     cricket::SendDataResult* result) {
383   // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
384   // thread instead. Remove the BlockingCall() below and move assocated state to
385   // the network thread.
386   RTC_DCHECK_RUN_ON(signaling_thread());
387   RTC_DCHECK(data_channel_transport());
388 
389   RTCError error = network_thread()->BlockingCall([this, sid, params, payload] {
390     return data_channel_transport()->SendData(sid, params, payload);
391   });
392 
393   if (error.ok()) {
394     *result = cricket::SendDataResult::SDR_SUCCESS;
395     return true;
396   } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
397     // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
398     // TODO(mellem):  Stop using RTCError here and get rid of the mapping.
399     *result = cricket::SendDataResult::SDR_BLOCK;
400     return false;
401   }
402   *result = cricket::SendDataResult::SDR_ERROR;
403   return false;
404 }
405 
NotifyDataChannelsOfTransportCreated()406 void DataChannelController::NotifyDataChannelsOfTransportCreated() {
407   RTC_DCHECK_RUN_ON(network_thread());
408   signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
409     if (self) {
410       RTC_DCHECK_RUN_ON(self->signaling_thread());
411       for (const auto& channel : self->sctp_data_channels_) {
412         channel->OnTransportChannelCreated();
413       }
414     }
415   });
416 }
417 
network_thread() const418 rtc::Thread* DataChannelController::network_thread() const {
419   return pc_->network_thread();
420 }
signaling_thread() const421 rtc::Thread* DataChannelController::signaling_thread() const {
422   return pc_->signaling_thread();
423 }
424 
425 }  // namespace webrtc
426