1 /*
2 * Copyright 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "pc/data_channel_controller.h"
12
13 #include <utility>
14
15 #include "api/peer_connection_interface.h"
16 #include "api/rtc_error.h"
17 #include "pc/peer_connection_internal.h"
18 #include "pc/sctp_utils.h"
19 #include "rtc_base/logging.h"
20
21 namespace webrtc {
22
~DataChannelController()23 DataChannelController::~DataChannelController() {
24 // Since channels may have multiple owners, we cannot guarantee that
25 // they will be deallocated before destroying the controller.
26 // Therefore, detach them from the controller.
27 for (auto channel : sctp_data_channels_) {
28 channel->DetachFromController();
29 }
30 }
31
HasDataChannels() const32 bool DataChannelController::HasDataChannels() const {
33 RTC_DCHECK_RUN_ON(signaling_thread());
34 return !sctp_data_channels_.empty();
35 }
36
SendData(int sid,const SendDataParams & params,const rtc::CopyOnWriteBuffer & payload,cricket::SendDataResult * result)37 bool DataChannelController::SendData(int sid,
38 const SendDataParams& params,
39 const rtc::CopyOnWriteBuffer& payload,
40 cricket::SendDataResult* result) {
41 if (data_channel_transport())
42 return DataChannelSendData(sid, params, payload, result);
43 RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
44 return false;
45 }
46
ConnectDataChannel(SctpDataChannel * webrtc_data_channel)47 bool DataChannelController::ConnectDataChannel(
48 SctpDataChannel* webrtc_data_channel) {
49 RTC_DCHECK_RUN_ON(signaling_thread());
50 if (!data_channel_transport()) {
51 // Don't log an error here, because DataChannels are expected to call
52 // ConnectDataChannel in this state. It's the only way to initially tell
53 // whether or not the underlying transport is ready.
54 return false;
55 }
56 SignalDataChannelTransportWritable_s.connect(
57 webrtc_data_channel, &SctpDataChannel::OnTransportReady);
58 SignalDataChannelTransportReceivedData_s.connect(
59 webrtc_data_channel, &SctpDataChannel::OnDataReceived);
60 SignalDataChannelTransportChannelClosing_s.connect(
61 webrtc_data_channel, &SctpDataChannel::OnClosingProcedureStartedRemotely);
62 SignalDataChannelTransportChannelClosed_s.connect(
63 webrtc_data_channel, &SctpDataChannel::OnClosingProcedureComplete);
64 return true;
65 }
66
DisconnectDataChannel(SctpDataChannel * webrtc_data_channel)67 void DataChannelController::DisconnectDataChannel(
68 SctpDataChannel* webrtc_data_channel) {
69 RTC_DCHECK_RUN_ON(signaling_thread());
70 if (!data_channel_transport()) {
71 RTC_LOG(LS_ERROR)
72 << "DisconnectDataChannel called when sctp_transport_ is NULL.";
73 return;
74 }
75 SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel);
76 SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel);
77 SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel);
78 SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel);
79 }
80
AddSctpDataStream(int sid)81 void DataChannelController::AddSctpDataStream(int sid) {
82 if (data_channel_transport()) {
83 network_thread()->BlockingCall([this, sid] {
84 if (data_channel_transport()) {
85 data_channel_transport()->OpenChannel(sid);
86 }
87 });
88 }
89 }
90
RemoveSctpDataStream(int sid)91 void DataChannelController::RemoveSctpDataStream(int sid) {
92 if (data_channel_transport()) {
93 network_thread()->BlockingCall([this, sid] {
94 if (data_channel_transport()) {
95 data_channel_transport()->CloseChannel(sid);
96 }
97 });
98 }
99 }
100
ReadyToSendData() const101 bool DataChannelController::ReadyToSendData() const {
102 RTC_DCHECK_RUN_ON(signaling_thread());
103 return (data_channel_transport() && data_channel_transport_ready_to_send_);
104 }
105
OnDataReceived(int channel_id,DataMessageType type,const rtc::CopyOnWriteBuffer & buffer)106 void DataChannelController::OnDataReceived(
107 int channel_id,
108 DataMessageType type,
109 const rtc::CopyOnWriteBuffer& buffer) {
110 RTC_DCHECK_RUN_ON(network_thread());
111 cricket::ReceiveDataParams params;
112 params.sid = channel_id;
113 params.type = type;
114 signaling_thread()->PostTask(
115 [self = weak_factory_.GetWeakPtr(), params, buffer] {
116 if (self) {
117 RTC_DCHECK_RUN_ON(self->signaling_thread());
118 // TODO(bugs.webrtc.org/11547): The data being received should be
119 // delivered on the network thread. The way HandleOpenMessage_s works
120 // right now is that it's called for all types of buffers and operates
121 // as a selector function. Change this so that it's only called for
122 // buffers that it should be able to handle. Once we do that, we can
123 // deliver all other buffers on the network thread (change
124 // SignalDataChannelTransportReceivedData_s to
125 // SignalDataChannelTransportReceivedData_n).
126 if (!self->HandleOpenMessage_s(params, buffer)) {
127 self->SignalDataChannelTransportReceivedData_s(params, buffer);
128 }
129 }
130 });
131 }
132
OnChannelClosing(int channel_id)133 void DataChannelController::OnChannelClosing(int channel_id) {
134 RTC_DCHECK_RUN_ON(network_thread());
135 signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] {
136 if (self) {
137 RTC_DCHECK_RUN_ON(self->signaling_thread());
138 self->SignalDataChannelTransportChannelClosing_s(channel_id);
139 }
140 });
141 }
142
OnChannelClosed(int channel_id)143 void DataChannelController::OnChannelClosed(int channel_id) {
144 RTC_DCHECK_RUN_ON(network_thread());
145 signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] {
146 if (self) {
147 RTC_DCHECK_RUN_ON(self->signaling_thread());
148 self->SignalDataChannelTransportChannelClosed_s(channel_id);
149 }
150 });
151 }
152
OnReadyToSend()153 void DataChannelController::OnReadyToSend() {
154 RTC_DCHECK_RUN_ON(network_thread());
155 signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
156 if (self) {
157 RTC_DCHECK_RUN_ON(self->signaling_thread());
158 self->data_channel_transport_ready_to_send_ = true;
159 self->SignalDataChannelTransportWritable_s(
160 self->data_channel_transport_ready_to_send_);
161 }
162 });
163 }
164
OnTransportClosed(RTCError error)165 void DataChannelController::OnTransportClosed(RTCError error) {
166 RTC_DCHECK_RUN_ON(network_thread());
167 signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), error] {
168 if (self) {
169 RTC_DCHECK_RUN_ON(self->signaling_thread());
170 self->OnTransportChannelClosed(error);
171 }
172 });
173 }
174
SetupDataChannelTransport_n()175 void DataChannelController::SetupDataChannelTransport_n() {
176 RTC_DCHECK_RUN_ON(network_thread());
177
178 // There's a new data channel transport. This needs to be signaled to the
179 // `sctp_data_channels_` so that they can reopen and reconnect. This is
180 // necessary when bundling is applied.
181 NotifyDataChannelsOfTransportCreated();
182 }
183
TeardownDataChannelTransport_n()184 void DataChannelController::TeardownDataChannelTransport_n() {
185 RTC_DCHECK_RUN_ON(network_thread());
186 if (data_channel_transport()) {
187 data_channel_transport()->SetDataSink(nullptr);
188 }
189 set_data_channel_transport(nullptr);
190 }
191
OnTransportChanged(DataChannelTransportInterface * new_data_channel_transport)192 void DataChannelController::OnTransportChanged(
193 DataChannelTransportInterface* new_data_channel_transport) {
194 RTC_DCHECK_RUN_ON(network_thread());
195 if (data_channel_transport() &&
196 data_channel_transport() != new_data_channel_transport) {
197 // Changed which data channel transport is used for `sctp_mid_` (eg. now
198 // it's bundled).
199 data_channel_transport()->SetDataSink(nullptr);
200 set_data_channel_transport(new_data_channel_transport);
201 if (new_data_channel_transport) {
202 new_data_channel_transport->SetDataSink(this);
203
204 // There's a new data channel transport. This needs to be signaled to the
205 // `sctp_data_channels_` so that they can reopen and reconnect. This is
206 // necessary when bundling is applied.
207 NotifyDataChannelsOfTransportCreated();
208 }
209 }
210 }
211
GetDataChannelStats() const212 std::vector<DataChannelStats> DataChannelController::GetDataChannelStats()
213 const {
214 RTC_DCHECK_RUN_ON(signaling_thread());
215 std::vector<DataChannelStats> stats;
216 stats.reserve(sctp_data_channels_.size());
217 for (const auto& channel : sctp_data_channels_)
218 stats.push_back(channel->GetStats());
219 return stats;
220 }
221
HandleOpenMessage_s(const cricket::ReceiveDataParams & params,const rtc::CopyOnWriteBuffer & buffer)222 bool DataChannelController::HandleOpenMessage_s(
223 const cricket::ReceiveDataParams& params,
224 const rtc::CopyOnWriteBuffer& buffer) {
225 if (params.type == DataMessageType::kControl && IsOpenMessage(buffer)) {
226 // Received OPEN message; parse and signal that a new data channel should
227 // be created.
228 std::string label;
229 InternalDataChannelInit config;
230 config.id = params.sid;
231 if (!ParseDataChannelOpenMessage(buffer, &label, &config)) {
232 RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for sid "
233 << params.sid;
234 return true;
235 }
236 config.open_handshake_role = InternalDataChannelInit::kAcker;
237 OnDataChannelOpenMessage(label, config);
238 return true;
239 }
240 return false;
241 }
242
OnDataChannelOpenMessage(const std::string & label,const InternalDataChannelInit & config)243 void DataChannelController::OnDataChannelOpenMessage(
244 const std::string& label,
245 const InternalDataChannelInit& config) {
246 rtc::scoped_refptr<DataChannelInterface> channel(
247 InternalCreateDataChannelWithProxy(label, &config));
248 if (!channel.get()) {
249 RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message.";
250 return;
251 }
252
253 pc_->Observer()->OnDataChannel(std::move(channel));
254 pc_->NoteDataAddedEvent();
255 }
256
257 rtc::scoped_refptr<DataChannelInterface>
InternalCreateDataChannelWithProxy(const std::string & label,const InternalDataChannelInit * config)258 DataChannelController::InternalCreateDataChannelWithProxy(
259 const std::string& label,
260 const InternalDataChannelInit* config) {
261 RTC_DCHECK_RUN_ON(signaling_thread());
262 if (pc_->IsClosed()) {
263 return nullptr;
264 }
265
266 rtc::scoped_refptr<SctpDataChannel> channel =
267 InternalCreateSctpDataChannel(label, config);
268 if (channel) {
269 return SctpDataChannel::CreateProxy(channel);
270 }
271
272 return nullptr;
273 }
274
275 rtc::scoped_refptr<SctpDataChannel>
InternalCreateSctpDataChannel(const std::string & label,const InternalDataChannelInit * config)276 DataChannelController::InternalCreateSctpDataChannel(
277 const std::string& label,
278 const InternalDataChannelInit* config) {
279 RTC_DCHECK_RUN_ON(signaling_thread());
280 InternalDataChannelInit new_config =
281 config ? (*config) : InternalDataChannelInit();
282 if (new_config.id < 0) {
283 rtc::SSLRole role;
284 if ((pc_->GetSctpSslRole(&role)) &&
285 !sid_allocator_.AllocateSid(role, &new_config.id)) {
286 RTC_LOG(LS_ERROR) << "No id can be allocated for the SCTP data channel.";
287 return nullptr;
288 }
289 } else if (!sid_allocator_.ReserveSid(new_config.id)) {
290 RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel "
291 "because the id is already in use or out of range.";
292 return nullptr;
293 }
294 rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
295 this, label, new_config, signaling_thread(), network_thread()));
296 if (!channel) {
297 sid_allocator_.ReleaseSid(new_config.id);
298 return nullptr;
299 }
300 sctp_data_channels_.push_back(channel);
301 channel->SignalClosed.connect(
302 pc_, &PeerConnectionInternal::OnSctpDataChannelClosed);
303 SignalSctpDataChannelCreated_(channel.get());
304 return channel;
305 }
306
AllocateSctpSids(rtc::SSLRole role)307 void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
308 RTC_DCHECK_RUN_ON(signaling_thread());
309 std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
310 for (const auto& channel : sctp_data_channels_) {
311 if (channel->id() < 0) {
312 int sid;
313 if (!sid_allocator_.AllocateSid(role, &sid)) {
314 RTC_LOG(LS_ERROR) << "Failed to allocate SCTP sid, closing channel.";
315 channels_to_close.push_back(channel);
316 continue;
317 }
318 channel->SetSctpSid(sid);
319 }
320 }
321 // Since closing modifies the list of channels, we have to do the actual
322 // closing outside the loop.
323 for (const auto& channel : channels_to_close) {
324 channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
325 }
326 }
327
OnSctpDataChannelClosed(SctpDataChannel * channel)328 void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
329 RTC_DCHECK_RUN_ON(signaling_thread());
330 for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
331 ++it) {
332 if (it->get() == channel) {
333 if (channel->id() >= 0) {
334 // After the closing procedure is done, it's safe to use this ID for
335 // another data channel.
336 sid_allocator_.ReleaseSid(channel->id());
337 }
338 // Since this method is triggered by a signal from the DataChannel,
339 // we can't free it directly here; we need to free it asynchronously.
340 sctp_data_channels_to_free_.push_back(*it);
341 sctp_data_channels_.erase(it);
342 signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
343 if (self) {
344 RTC_DCHECK_RUN_ON(self->signaling_thread());
345 self->sctp_data_channels_to_free_.clear();
346 }
347 });
348 return;
349 }
350 }
351 }
352
OnTransportChannelClosed(RTCError error)353 void DataChannelController::OnTransportChannelClosed(RTCError error) {
354 RTC_DCHECK_RUN_ON(signaling_thread());
355 // Use a temporary copy of the SCTP DataChannel list because the
356 // DataChannel may callback to us and try to modify the list.
357 std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs;
358 temp_sctp_dcs.swap(sctp_data_channels_);
359 for (const auto& channel : temp_sctp_dcs) {
360 channel->OnTransportChannelClosed(error);
361 }
362 }
363
data_channel_transport() const364 DataChannelTransportInterface* DataChannelController::data_channel_transport()
365 const {
366 // TODO(bugs.webrtc.org/11547): Only allow this accessor to be called on the
367 // network thread.
368 // RTC_DCHECK_RUN_ON(network_thread());
369 return data_channel_transport_;
370 }
371
set_data_channel_transport(DataChannelTransportInterface * transport)372 void DataChannelController::set_data_channel_transport(
373 DataChannelTransportInterface* transport) {
374 RTC_DCHECK_RUN_ON(network_thread());
375 data_channel_transport_ = transport;
376 }
377
DataChannelSendData(int sid,const SendDataParams & params,const rtc::CopyOnWriteBuffer & payload,cricket::SendDataResult * result)378 bool DataChannelController::DataChannelSendData(
379 int sid,
380 const SendDataParams& params,
381 const rtc::CopyOnWriteBuffer& payload,
382 cricket::SendDataResult* result) {
383 // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
384 // thread instead. Remove the BlockingCall() below and move assocated state to
385 // the network thread.
386 RTC_DCHECK_RUN_ON(signaling_thread());
387 RTC_DCHECK(data_channel_transport());
388
389 RTCError error = network_thread()->BlockingCall([this, sid, params, payload] {
390 return data_channel_transport()->SendData(sid, params, payload);
391 });
392
393 if (error.ok()) {
394 *result = cricket::SendDataResult::SDR_SUCCESS;
395 return true;
396 } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
397 // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
398 // TODO(mellem): Stop using RTCError here and get rid of the mapping.
399 *result = cricket::SendDataResult::SDR_BLOCK;
400 return false;
401 }
402 *result = cricket::SendDataResult::SDR_ERROR;
403 return false;
404 }
405
NotifyDataChannelsOfTransportCreated()406 void DataChannelController::NotifyDataChannelsOfTransportCreated() {
407 RTC_DCHECK_RUN_ON(network_thread());
408 signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
409 if (self) {
410 RTC_DCHECK_RUN_ON(self->signaling_thread());
411 for (const auto& channel : self->sctp_data_channels_) {
412 channel->OnTransportChannelCreated();
413 }
414 }
415 });
416 }
417
network_thread() const418 rtc::Thread* DataChannelController::network_thread() const {
419 return pc_->network_thread();
420 }
signaling_thread() const421 rtc::Thread* DataChannelController::signaling_thread() const {
422 return pc_->signaling_thread();
423 }
424
425 } // namespace webrtc
426