1 /*
2 * Copyright 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "pc/data_channel_controller.h"
12
13 #include <utility>
14
15 #include "pc/peer_connection.h"
16 #include "pc/sctp_utils.h"
17
18 namespace webrtc {
19
HasDataChannels() const20 bool DataChannelController::HasDataChannels() const {
21 RTC_DCHECK_RUN_ON(signaling_thread());
22 return !rtp_data_channels_.empty() || !sctp_data_channels_.empty();
23 }
24
SendData(const cricket::SendDataParams & params,const rtc::CopyOnWriteBuffer & payload,cricket::SendDataResult * result)25 bool DataChannelController::SendData(const cricket::SendDataParams& params,
26 const rtc::CopyOnWriteBuffer& payload,
27 cricket::SendDataResult* result) {
28 if (data_channel_transport())
29 return DataChannelSendData(params, payload, result);
30 if (rtp_data_channel())
31 return rtp_data_channel()->SendData(params, payload, result);
32 RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
33 return false;
34 }
35
ConnectDataChannel(RtpDataChannel * webrtc_data_channel)36 bool DataChannelController::ConnectDataChannel(
37 RtpDataChannel* webrtc_data_channel) {
38 RTC_DCHECK_RUN_ON(signaling_thread());
39 if (!rtp_data_channel()) {
40 // Don't log an error here, because DataChannels are expected to call
41 // ConnectDataChannel in this state. It's the only way to initially tell
42 // whether or not the underlying transport is ready.
43 return false;
44 }
45 rtp_data_channel()->SignalReadyToSendData.connect(
46 webrtc_data_channel, &RtpDataChannel::OnChannelReady);
47 rtp_data_channel()->SignalDataReceived.connect(
48 webrtc_data_channel, &RtpDataChannel::OnDataReceived);
49 return true;
50 }
51
DisconnectDataChannel(RtpDataChannel * webrtc_data_channel)52 void DataChannelController::DisconnectDataChannel(
53 RtpDataChannel* webrtc_data_channel) {
54 RTC_DCHECK_RUN_ON(signaling_thread());
55 if (!rtp_data_channel()) {
56 RTC_LOG(LS_ERROR)
57 << "DisconnectDataChannel called when rtp_data_channel_ is NULL.";
58 return;
59 }
60 rtp_data_channel()->SignalReadyToSendData.disconnect(webrtc_data_channel);
61 rtp_data_channel()->SignalDataReceived.disconnect(webrtc_data_channel);
62 }
63
ConnectDataChannel(SctpDataChannel * webrtc_data_channel)64 bool DataChannelController::ConnectDataChannel(
65 SctpDataChannel* webrtc_data_channel) {
66 RTC_DCHECK_RUN_ON(signaling_thread());
67 if (!data_channel_transport()) {
68 // Don't log an error here, because DataChannels are expected to call
69 // ConnectDataChannel in this state. It's the only way to initially tell
70 // whether or not the underlying transport is ready.
71 return false;
72 }
73 SignalDataChannelTransportWritable_s.connect(
74 webrtc_data_channel, &SctpDataChannel::OnTransportReady);
75 SignalDataChannelTransportReceivedData_s.connect(
76 webrtc_data_channel, &SctpDataChannel::OnDataReceived);
77 SignalDataChannelTransportChannelClosing_s.connect(
78 webrtc_data_channel, &SctpDataChannel::OnClosingProcedureStartedRemotely);
79 SignalDataChannelTransportChannelClosed_s.connect(
80 webrtc_data_channel, &SctpDataChannel::OnClosingProcedureComplete);
81 return true;
82 }
83
DisconnectDataChannel(SctpDataChannel * webrtc_data_channel)84 void DataChannelController::DisconnectDataChannel(
85 SctpDataChannel* webrtc_data_channel) {
86 RTC_DCHECK_RUN_ON(signaling_thread());
87 if (!data_channel_transport()) {
88 RTC_LOG(LS_ERROR)
89 << "DisconnectDataChannel called when sctp_transport_ is NULL.";
90 return;
91 }
92 SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel);
93 SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel);
94 SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel);
95 SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel);
96 }
97
AddSctpDataStream(int sid)98 void DataChannelController::AddSctpDataStream(int sid) {
99 if (data_channel_transport()) {
100 network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
101 if (data_channel_transport()) {
102 data_channel_transport()->OpenChannel(sid);
103 }
104 });
105 }
106 }
107
RemoveSctpDataStream(int sid)108 void DataChannelController::RemoveSctpDataStream(int sid) {
109 if (data_channel_transport()) {
110 network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
111 if (data_channel_transport()) {
112 data_channel_transport()->CloseChannel(sid);
113 }
114 });
115 }
116 }
117
ReadyToSendData() const118 bool DataChannelController::ReadyToSendData() const {
119 RTC_DCHECK_RUN_ON(signaling_thread());
120 return (rtp_data_channel() && rtp_data_channel()->ready_to_send_data()) ||
121 (data_channel_transport() && data_channel_transport_ready_to_send_);
122 }
123
OnDataReceived(int channel_id,DataMessageType type,const rtc::CopyOnWriteBuffer & buffer)124 void DataChannelController::OnDataReceived(
125 int channel_id,
126 DataMessageType type,
127 const rtc::CopyOnWriteBuffer& buffer) {
128 RTC_DCHECK_RUN_ON(network_thread());
129 cricket::ReceiveDataParams params;
130 params.sid = channel_id;
131 params.type = ToCricketDataMessageType(type);
132 data_channel_transport_invoker_->AsyncInvoke<void>(
133 RTC_FROM_HERE, signaling_thread(), [this, params, buffer] {
134 RTC_DCHECK_RUN_ON(signaling_thread());
135 // TODO(bugs.webrtc.org/11547): The data being received should be
136 // delivered on the network thread. The way HandleOpenMessage_s works
137 // right now is that it's called for all types of buffers and operates
138 // as a selector function. Change this so that it's only called for
139 // buffers that it should be able to handle. Once we do that, we can
140 // deliver all other buffers on the network thread (change
141 // SignalDataChannelTransportReceivedData_s to
142 // SignalDataChannelTransportReceivedData_n).
143 if (!HandleOpenMessage_s(params, buffer)) {
144 SignalDataChannelTransportReceivedData_s(params, buffer);
145 }
146 });
147 }
148
OnChannelClosing(int channel_id)149 void DataChannelController::OnChannelClosing(int channel_id) {
150 RTC_DCHECK_RUN_ON(network_thread());
151 data_channel_transport_invoker_->AsyncInvoke<void>(
152 RTC_FROM_HERE, signaling_thread(), [this, channel_id] {
153 RTC_DCHECK_RUN_ON(signaling_thread());
154 SignalDataChannelTransportChannelClosing_s(channel_id);
155 });
156 }
157
OnChannelClosed(int channel_id)158 void DataChannelController::OnChannelClosed(int channel_id) {
159 RTC_DCHECK_RUN_ON(network_thread());
160 data_channel_transport_invoker_->AsyncInvoke<void>(
161 RTC_FROM_HERE, signaling_thread(), [this, channel_id] {
162 RTC_DCHECK_RUN_ON(signaling_thread());
163 SignalDataChannelTransportChannelClosed_s(channel_id);
164 });
165 }
166
OnReadyToSend()167 void DataChannelController::OnReadyToSend() {
168 RTC_DCHECK_RUN_ON(network_thread());
169 data_channel_transport_invoker_->AsyncInvoke<void>(
170 RTC_FROM_HERE, signaling_thread(), [this] {
171 RTC_DCHECK_RUN_ON(signaling_thread());
172 data_channel_transport_ready_to_send_ = true;
173 SignalDataChannelTransportWritable_s(
174 data_channel_transport_ready_to_send_);
175 });
176 }
177
OnTransportClosed()178 void DataChannelController::OnTransportClosed() {
179 RTC_DCHECK_RUN_ON(network_thread());
180 data_channel_transport_invoker_->AsyncInvoke<void>(
181 RTC_FROM_HERE, signaling_thread(), [this] {
182 RTC_DCHECK_RUN_ON(signaling_thread());
183 OnTransportChannelClosed();
184 });
185 }
186
SetupDataChannelTransport_n()187 void DataChannelController::SetupDataChannelTransport_n() {
188 RTC_DCHECK_RUN_ON(network_thread());
189 data_channel_transport_invoker_ = std::make_unique<rtc::AsyncInvoker>();
190
191 // There's a new data channel transport. This needs to be signaled to the
192 // |sctp_data_channels_| so that they can reopen and reconnect. This is
193 // necessary when bundling is applied.
194 NotifyDataChannelsOfTransportCreated();
195 }
196
TeardownDataChannelTransport_n()197 void DataChannelController::TeardownDataChannelTransport_n() {
198 RTC_DCHECK_RUN_ON(network_thread());
199 data_channel_transport_invoker_ = nullptr;
200 if (data_channel_transport()) {
201 data_channel_transport()->SetDataSink(nullptr);
202 }
203 set_data_channel_transport(nullptr);
204 }
205
OnTransportChanged(DataChannelTransportInterface * new_data_channel_transport)206 void DataChannelController::OnTransportChanged(
207 DataChannelTransportInterface* new_data_channel_transport) {
208 RTC_DCHECK_RUN_ON(network_thread());
209 if (data_channel_transport() &&
210 data_channel_transport() != new_data_channel_transport) {
211 // Changed which data channel transport is used for |sctp_mid_| (eg. now
212 // it's bundled).
213 data_channel_transport()->SetDataSink(nullptr);
214 set_data_channel_transport(new_data_channel_transport);
215 if (new_data_channel_transport) {
216 new_data_channel_transport->SetDataSink(this);
217
218 // There's a new data channel transport. This needs to be signaled to the
219 // |sctp_data_channels_| so that they can reopen and reconnect. This is
220 // necessary when bundling is applied.
221 NotifyDataChannelsOfTransportCreated();
222 }
223 }
224 }
225
GetDataChannelStats() const226 std::vector<DataChannelStats> DataChannelController::GetDataChannelStats()
227 const {
228 RTC_DCHECK_RUN_ON(signaling_thread());
229 std::vector<DataChannelStats> stats;
230 stats.reserve(sctp_data_channels_.size());
231 for (const auto& channel : sctp_data_channels_)
232 stats.push_back(channel->GetStats());
233 return stats;
234 }
235
HandleOpenMessage_s(const cricket::ReceiveDataParams & params,const rtc::CopyOnWriteBuffer & buffer)236 bool DataChannelController::HandleOpenMessage_s(
237 const cricket::ReceiveDataParams& params,
238 const rtc::CopyOnWriteBuffer& buffer) {
239 if (params.type == cricket::DMT_CONTROL && IsOpenMessage(buffer)) {
240 // Received OPEN message; parse and signal that a new data channel should
241 // be created.
242 std::string label;
243 InternalDataChannelInit config;
244 config.id = params.ssrc;
245 if (!ParseDataChannelOpenMessage(buffer, &label, &config)) {
246 RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for ssrc "
247 << params.ssrc;
248 return true;
249 }
250 config.open_handshake_role = InternalDataChannelInit::kAcker;
251 OnDataChannelOpenMessage(label, config);
252 return true;
253 }
254 return false;
255 }
256
OnDataChannelOpenMessage(const std::string & label,const InternalDataChannelInit & config)257 void DataChannelController::OnDataChannelOpenMessage(
258 const std::string& label,
259 const InternalDataChannelInit& config) {
260 rtc::scoped_refptr<DataChannelInterface> channel(
261 InternalCreateDataChannelWithProxy(label, &config));
262 if (!channel.get()) {
263 RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message.";
264 return;
265 }
266
267 pc_->Observer()->OnDataChannel(std::move(channel));
268 pc_->NoteDataAddedEvent();
269 }
270
271 rtc::scoped_refptr<DataChannelInterface>
InternalCreateDataChannelWithProxy(const std::string & label,const InternalDataChannelInit * config)272 DataChannelController::InternalCreateDataChannelWithProxy(
273 const std::string& label,
274 const InternalDataChannelInit* config) {
275 RTC_DCHECK_RUN_ON(signaling_thread());
276 if (pc_->IsClosed()) {
277 return nullptr;
278 }
279 if (data_channel_type_ == cricket::DCT_NONE) {
280 RTC_LOG(LS_ERROR)
281 << "InternalCreateDataChannel: Data is not supported in this call.";
282 return nullptr;
283 }
284 if (IsSctpLike(data_channel_type())) {
285 rtc::scoped_refptr<SctpDataChannel> channel =
286 InternalCreateSctpDataChannel(label, config);
287 if (channel) {
288 return SctpDataChannel::CreateProxy(channel);
289 }
290 } else if (data_channel_type() == cricket::DCT_RTP) {
291 rtc::scoped_refptr<RtpDataChannel> channel =
292 InternalCreateRtpDataChannel(label, config);
293 if (channel) {
294 return RtpDataChannel::CreateProxy(channel);
295 }
296 }
297
298 return nullptr;
299 }
300
301 rtc::scoped_refptr<RtpDataChannel>
InternalCreateRtpDataChannel(const std::string & label,const DataChannelInit * config)302 DataChannelController::InternalCreateRtpDataChannel(
303 const std::string& label,
304 const DataChannelInit* config) {
305 RTC_DCHECK_RUN_ON(signaling_thread());
306 DataChannelInit new_config = config ? (*config) : DataChannelInit();
307 rtc::scoped_refptr<RtpDataChannel> channel(
308 RtpDataChannel::Create(this, label, new_config, signaling_thread()));
309 if (!channel) {
310 return nullptr;
311 }
312 if (rtp_data_channels_.find(channel->label()) != rtp_data_channels_.end()) {
313 RTC_LOG(LS_ERROR) << "DataChannel with label " << channel->label()
314 << " already exists.";
315 return nullptr;
316 }
317 rtp_data_channels_[channel->label()] = channel;
318 SignalRtpDataChannelCreated_(channel.get());
319 return channel;
320 }
321
322 rtc::scoped_refptr<SctpDataChannel>
InternalCreateSctpDataChannel(const std::string & label,const InternalDataChannelInit * config)323 DataChannelController::InternalCreateSctpDataChannel(
324 const std::string& label,
325 const InternalDataChannelInit* config) {
326 RTC_DCHECK_RUN_ON(signaling_thread());
327 InternalDataChannelInit new_config =
328 config ? (*config) : InternalDataChannelInit();
329 if (new_config.id < 0) {
330 rtc::SSLRole role;
331 if ((pc_->GetSctpSslRole(&role)) &&
332 !sid_allocator_.AllocateSid(role, &new_config.id)) {
333 RTC_LOG(LS_ERROR) << "No id can be allocated for the SCTP data channel.";
334 return nullptr;
335 }
336 } else if (!sid_allocator_.ReserveSid(new_config.id)) {
337 RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel "
338 "because the id is already in use or out of range.";
339 return nullptr;
340 }
341 rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
342 this, label, new_config, signaling_thread(), network_thread()));
343 if (!channel) {
344 sid_allocator_.ReleaseSid(new_config.id);
345 return nullptr;
346 }
347 sctp_data_channels_.push_back(channel);
348 channel->SignalClosed.connect(pc_, &PeerConnection::OnSctpDataChannelClosed);
349 SignalSctpDataChannelCreated_(channel.get());
350 return channel;
351 }
352
AllocateSctpSids(rtc::SSLRole role)353 void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
354 RTC_DCHECK_RUN_ON(signaling_thread());
355 std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
356 for (const auto& channel : sctp_data_channels_) {
357 if (channel->id() < 0) {
358 int sid;
359 if (!sid_allocator_.AllocateSid(role, &sid)) {
360 RTC_LOG(LS_ERROR) << "Failed to allocate SCTP sid, closing channel.";
361 channels_to_close.push_back(channel);
362 continue;
363 }
364 channel->SetSctpSid(sid);
365 }
366 }
367 // Since closing modifies the list of channels, we have to do the actual
368 // closing outside the loop.
369 for (const auto& channel : channels_to_close) {
370 channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
371 }
372 }
373
OnSctpDataChannelClosed(SctpDataChannel * channel)374 void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
375 RTC_DCHECK_RUN_ON(signaling_thread());
376 for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
377 ++it) {
378 if (it->get() == channel) {
379 if (channel->id() >= 0) {
380 // After the closing procedure is done, it's safe to use this ID for
381 // another data channel.
382 sid_allocator_.ReleaseSid(channel->id());
383 }
384 // Since this method is triggered by a signal from the DataChannel,
385 // we can't free it directly here; we need to free it asynchronously.
386 sctp_data_channels_to_free_.push_back(*it);
387 sctp_data_channels_.erase(it);
388 signaling_thread()->PostTask(
389 RTC_FROM_HERE, [self = weak_factory_.GetWeakPtr()] {
390 if (self) {
391 RTC_DCHECK_RUN_ON(self->signaling_thread());
392 self->sctp_data_channels_to_free_.clear();
393 }
394 });
395 return;
396 }
397 }
398 }
399
OnTransportChannelClosed()400 void DataChannelController::OnTransportChannelClosed() {
401 RTC_DCHECK_RUN_ON(signaling_thread());
402 // Use a temporary copy of the RTP/SCTP DataChannel list because the
403 // DataChannel may callback to us and try to modify the list.
404 std::map<std::string, rtc::scoped_refptr<RtpDataChannel>> temp_rtp_dcs;
405 temp_rtp_dcs.swap(rtp_data_channels_);
406 for (const auto& kv : temp_rtp_dcs) {
407 kv.second->OnTransportChannelClosed();
408 }
409
410 std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs;
411 temp_sctp_dcs.swap(sctp_data_channels_);
412 for (const auto& channel : temp_sctp_dcs) {
413 channel->OnTransportChannelClosed();
414 }
415 }
416
FindDataChannelBySid(int sid) const417 SctpDataChannel* DataChannelController::FindDataChannelBySid(int sid) const {
418 RTC_DCHECK_RUN_ON(signaling_thread());
419 for (const auto& channel : sctp_data_channels_) {
420 if (channel->id() == sid) {
421 return channel;
422 }
423 }
424 return nullptr;
425 }
426
UpdateLocalRtpDataChannels(const cricket::StreamParamsVec & streams)427 void DataChannelController::UpdateLocalRtpDataChannels(
428 const cricket::StreamParamsVec& streams) {
429 std::vector<std::string> existing_channels;
430
431 RTC_DCHECK_RUN_ON(signaling_thread());
432 // Find new and active data channels.
433 for (const cricket::StreamParams& params : streams) {
434 // |it->sync_label| is actually the data channel label. The reason is that
435 // we use the same naming of data channels as we do for
436 // MediaStreams and Tracks.
437 // For MediaStreams, the sync_label is the MediaStream label and the
438 // track label is the same as |streamid|.
439 const std::string& channel_label = params.first_stream_id();
440 auto data_channel_it = rtp_data_channels()->find(channel_label);
441 if (data_channel_it == rtp_data_channels()->end()) {
442 RTC_LOG(LS_ERROR) << "channel label not found";
443 continue;
444 }
445 // Set the SSRC the data channel should use for sending.
446 data_channel_it->second->SetSendSsrc(params.first_ssrc());
447 existing_channels.push_back(data_channel_it->first);
448 }
449
450 UpdateClosingRtpDataChannels(existing_channels, true);
451 }
452
UpdateRemoteRtpDataChannels(const cricket::StreamParamsVec & streams)453 void DataChannelController::UpdateRemoteRtpDataChannels(
454 const cricket::StreamParamsVec& streams) {
455 RTC_DCHECK_RUN_ON(signaling_thread());
456
457 std::vector<std::string> existing_channels;
458
459 // Find new and active data channels.
460 for (const cricket::StreamParams& params : streams) {
461 // The data channel label is either the mslabel or the SSRC if the mslabel
462 // does not exist. Ex a=ssrc:444330170 mslabel:test1.
463 std::string label = params.first_stream_id().empty()
464 ? rtc::ToString(params.first_ssrc())
465 : params.first_stream_id();
466 auto data_channel_it = rtp_data_channels()->find(label);
467 if (data_channel_it == rtp_data_channels()->end()) {
468 // This is a new data channel.
469 CreateRemoteRtpDataChannel(label, params.first_ssrc());
470 } else {
471 data_channel_it->second->SetReceiveSsrc(params.first_ssrc());
472 }
473 existing_channels.push_back(label);
474 }
475
476 UpdateClosingRtpDataChannels(existing_channels, false);
477 }
478
data_channel_type() const479 cricket::DataChannelType DataChannelController::data_channel_type() const {
480 // TODO(bugs.webrtc.org/9987): Should be restricted to the signaling thread.
481 // RTC_DCHECK_RUN_ON(signaling_thread());
482 return data_channel_type_;
483 }
484
set_data_channel_type(cricket::DataChannelType type)485 void DataChannelController::set_data_channel_type(
486 cricket::DataChannelType type) {
487 RTC_DCHECK_RUN_ON(signaling_thread());
488 data_channel_type_ = type;
489 }
490
data_channel_transport() const491 DataChannelTransportInterface* DataChannelController::data_channel_transport()
492 const {
493 // TODO(bugs.webrtc.org/11547): Only allow this accessor to be called on the
494 // network thread.
495 // RTC_DCHECK_RUN_ON(network_thread());
496 return data_channel_transport_;
497 }
498
set_data_channel_transport(DataChannelTransportInterface * transport)499 void DataChannelController::set_data_channel_transport(
500 DataChannelTransportInterface* transport) {
501 RTC_DCHECK_RUN_ON(network_thread());
502 data_channel_transport_ = transport;
503 }
504
505 const std::map<std::string, rtc::scoped_refptr<RtpDataChannel>>*
rtp_data_channels() const506 DataChannelController::rtp_data_channels() const {
507 RTC_DCHECK_RUN_ON(signaling_thread());
508 return &rtp_data_channels_;
509 }
510
UpdateClosingRtpDataChannels(const std::vector<std::string> & active_channels,bool is_local_update)511 void DataChannelController::UpdateClosingRtpDataChannels(
512 const std::vector<std::string>& active_channels,
513 bool is_local_update) {
514 auto it = rtp_data_channels_.begin();
515 while (it != rtp_data_channels_.end()) {
516 RtpDataChannel* data_channel = it->second;
517 if (absl::c_linear_search(active_channels, data_channel->label())) {
518 ++it;
519 continue;
520 }
521
522 if (is_local_update) {
523 data_channel->SetSendSsrc(0);
524 } else {
525 data_channel->RemotePeerRequestClose();
526 }
527
528 if (data_channel->state() == RtpDataChannel::kClosed) {
529 rtp_data_channels_.erase(it);
530 it = rtp_data_channels_.begin();
531 } else {
532 ++it;
533 }
534 }
535 }
536
CreateRemoteRtpDataChannel(const std::string & label,uint32_t remote_ssrc)537 void DataChannelController::CreateRemoteRtpDataChannel(const std::string& label,
538 uint32_t remote_ssrc) {
539 if (data_channel_type() != cricket::DCT_RTP) {
540 return;
541 }
542 rtc::scoped_refptr<RtpDataChannel> channel(
543 InternalCreateRtpDataChannel(label, nullptr));
544 if (!channel.get()) {
545 RTC_LOG(LS_WARNING) << "Remote peer requested a DataChannel but"
546 "CreateDataChannel failed.";
547 return;
548 }
549 channel->SetReceiveSsrc(remote_ssrc);
550 rtc::scoped_refptr<DataChannelInterface> proxy_channel =
551 RtpDataChannel::CreateProxy(std::move(channel));
552 pc_->Observer()->OnDataChannel(std::move(proxy_channel));
553 }
554
DataChannelSendData(const cricket::SendDataParams & params,const rtc::CopyOnWriteBuffer & payload,cricket::SendDataResult * result)555 bool DataChannelController::DataChannelSendData(
556 const cricket::SendDataParams& params,
557 const rtc::CopyOnWriteBuffer& payload,
558 cricket::SendDataResult* result) {
559 // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
560 // thread instead. Remove the Invoke() below and move assocated state to
561 // the network thread.
562 RTC_DCHECK_RUN_ON(signaling_thread());
563 RTC_DCHECK(data_channel_transport());
564
565 SendDataParams send_params;
566 send_params.type = ToWebrtcDataMessageType(params.type);
567 send_params.ordered = params.ordered;
568 if (params.max_rtx_count >= 0) {
569 send_params.max_rtx_count = params.max_rtx_count;
570 } else if (params.max_rtx_ms >= 0) {
571 send_params.max_rtx_ms = params.max_rtx_ms;
572 }
573
574 RTCError error = network_thread()->Invoke<RTCError>(
575 RTC_FROM_HERE, [this, params, send_params, payload] {
576 return data_channel_transport()->SendData(params.sid, send_params,
577 payload);
578 });
579
580 if (error.ok()) {
581 *result = cricket::SendDataResult::SDR_SUCCESS;
582 return true;
583 } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
584 // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
585 // TODO(mellem): Stop using RTCError here and get rid of the mapping.
586 *result = cricket::SendDataResult::SDR_BLOCK;
587 return false;
588 }
589 *result = cricket::SendDataResult::SDR_ERROR;
590 return false;
591 }
592
NotifyDataChannelsOfTransportCreated()593 void DataChannelController::NotifyDataChannelsOfTransportCreated() {
594 RTC_DCHECK_RUN_ON(network_thread());
595 data_channel_transport_invoker_->AsyncInvoke<void>(
596 RTC_FROM_HERE, signaling_thread(), [this] {
597 RTC_DCHECK_RUN_ON(signaling_thread());
598 for (const auto& channel : sctp_data_channels_) {
599 channel->OnTransportChannelCreated();
600 }
601 });
602 }
603
network_thread() const604 rtc::Thread* DataChannelController::network_thread() const {
605 return pc_->network_thread();
606 }
signaling_thread() const607 rtc::Thread* DataChannelController::signaling_thread() const {
608 return pc_->signaling_thread();
609 }
610
611 } // namespace webrtc
612