1 /*
2 * Copyright 2020 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "pc/rtp_data_channel.h"
12
13 #include <memory>
14 #include <string>
15 #include <utility>
16
17 #include "api/proxy.h"
18 #include "rtc_base/checks.h"
19 #include "rtc_base/location.h"
20 #include "rtc_base/logging.h"
21 #include "rtc_base/ref_counted_object.h"
22 #include "rtc_base/thread.h"
23
24 namespace webrtc {
25
26 namespace {
27
28 static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024;
29
30 static std::atomic<int> g_unique_id{0};
31
GenerateUniqueId()32 int GenerateUniqueId() {
33 return ++g_unique_id;
34 }
35
36 // Define proxy for DataChannelInterface.
37 BEGIN_SIGNALING_PROXY_MAP(DataChannel)
38 PROXY_SIGNALING_THREAD_DESTRUCTOR()
39 PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
40 PROXY_METHOD0(void, UnregisterObserver)
41 BYPASS_PROXY_CONSTMETHOD0(std::string, label)
42 BYPASS_PROXY_CONSTMETHOD0(bool, reliable)
43 BYPASS_PROXY_CONSTMETHOD0(bool, ordered)
44 BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmitTime)
45 BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmits)
46 BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxRetransmitsOpt)
47 BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxPacketLifeTime)
48 BYPASS_PROXY_CONSTMETHOD0(std::string, protocol)
49 BYPASS_PROXY_CONSTMETHOD0(bool, negotiated)
50 // Can't bypass the proxy since the id may change.
51 PROXY_CONSTMETHOD0(int, id)
52 BYPASS_PROXY_CONSTMETHOD0(Priority, priority)
53 PROXY_CONSTMETHOD0(DataState, state)
54 PROXY_CONSTMETHOD0(RTCError, error)
55 PROXY_CONSTMETHOD0(uint32_t, messages_sent)
56 PROXY_CONSTMETHOD0(uint64_t, bytes_sent)
57 PROXY_CONSTMETHOD0(uint32_t, messages_received)
58 PROXY_CONSTMETHOD0(uint64_t, bytes_received)
59 PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
60 PROXY_METHOD0(void, Close)
61 // TODO(bugs.webrtc.org/11547): Change to run on the network thread.
62 PROXY_METHOD1(bool, Send, const DataBuffer&)
63 END_PROXY_MAP()
64
65 } // namespace
66
Create(RtpDataChannelProviderInterface * provider,const std::string & label,const DataChannelInit & config,rtc::Thread * signaling_thread)67 rtc::scoped_refptr<RtpDataChannel> RtpDataChannel::Create(
68 RtpDataChannelProviderInterface* provider,
69 const std::string& label,
70 const DataChannelInit& config,
71 rtc::Thread* signaling_thread) {
72 rtc::scoped_refptr<RtpDataChannel> channel(
73 new rtc::RefCountedObject<RtpDataChannel>(config, provider, label,
74 signaling_thread));
75 if (!channel->Init()) {
76 return nullptr;
77 }
78 return channel;
79 }
80
81 // static
CreateProxy(rtc::scoped_refptr<RtpDataChannel> channel)82 rtc::scoped_refptr<DataChannelInterface> RtpDataChannel::CreateProxy(
83 rtc::scoped_refptr<RtpDataChannel> channel) {
84 return DataChannelProxy::Create(channel->signaling_thread_, channel.get());
85 }
86
RtpDataChannel(const DataChannelInit & config,RtpDataChannelProviderInterface * provider,const std::string & label,rtc::Thread * signaling_thread)87 RtpDataChannel::RtpDataChannel(const DataChannelInit& config,
88 RtpDataChannelProviderInterface* provider,
89 const std::string& label,
90 rtc::Thread* signaling_thread)
91 : signaling_thread_(signaling_thread),
92 internal_id_(GenerateUniqueId()),
93 label_(label),
94 config_(config),
95 provider_(provider) {
96 RTC_DCHECK_RUN_ON(signaling_thread_);
97 }
98
Init()99 bool RtpDataChannel::Init() {
100 RTC_DCHECK_RUN_ON(signaling_thread_);
101 if (config_.reliable || config_.id != -1 || config_.maxRetransmits ||
102 config_.maxRetransmitTime) {
103 RTC_LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
104 "invalid DataChannelInit.";
105 return false;
106 }
107
108 return true;
109 }
110
~RtpDataChannel()111 RtpDataChannel::~RtpDataChannel() {
112 RTC_DCHECK_RUN_ON(signaling_thread_);
113 }
114
RegisterObserver(DataChannelObserver * observer)115 void RtpDataChannel::RegisterObserver(DataChannelObserver* observer) {
116 RTC_DCHECK_RUN_ON(signaling_thread_);
117 observer_ = observer;
118 DeliverQueuedReceivedData();
119 }
120
UnregisterObserver()121 void RtpDataChannel::UnregisterObserver() {
122 RTC_DCHECK_RUN_ON(signaling_thread_);
123 observer_ = nullptr;
124 }
125
Close()126 void RtpDataChannel::Close() {
127 RTC_DCHECK_RUN_ON(signaling_thread_);
128 if (state_ == kClosed)
129 return;
130 send_ssrc_ = 0;
131 send_ssrc_set_ = false;
132 SetState(kClosing);
133 UpdateState();
134 }
135
state() const136 RtpDataChannel::DataState RtpDataChannel::state() const {
137 RTC_DCHECK_RUN_ON(signaling_thread_);
138 return state_;
139 }
140
error() const141 RTCError RtpDataChannel::error() const {
142 RTC_DCHECK_RUN_ON(signaling_thread_);
143 return error_;
144 }
145
messages_sent() const146 uint32_t RtpDataChannel::messages_sent() const {
147 RTC_DCHECK_RUN_ON(signaling_thread_);
148 return messages_sent_;
149 }
150
bytes_sent() const151 uint64_t RtpDataChannel::bytes_sent() const {
152 RTC_DCHECK_RUN_ON(signaling_thread_);
153 return bytes_sent_;
154 }
155
messages_received() const156 uint32_t RtpDataChannel::messages_received() const {
157 RTC_DCHECK_RUN_ON(signaling_thread_);
158 return messages_received_;
159 }
160
bytes_received() const161 uint64_t RtpDataChannel::bytes_received() const {
162 RTC_DCHECK_RUN_ON(signaling_thread_);
163 return bytes_received_;
164 }
165
Send(const DataBuffer & buffer)166 bool RtpDataChannel::Send(const DataBuffer& buffer) {
167 RTC_DCHECK_RUN_ON(signaling_thread_);
168
169 if (state_ != kOpen) {
170 return false;
171 }
172
173 // TODO(jiayl): the spec is unclear about if the remote side should get the
174 // onmessage event. We need to figure out the expected behavior and change the
175 // code accordingly.
176 if (buffer.size() == 0) {
177 return true;
178 }
179
180 return SendDataMessage(buffer);
181 }
182
SetReceiveSsrc(uint32_t receive_ssrc)183 void RtpDataChannel::SetReceiveSsrc(uint32_t receive_ssrc) {
184 RTC_DCHECK_RUN_ON(signaling_thread_);
185
186 if (receive_ssrc_set_) {
187 return;
188 }
189 receive_ssrc_ = receive_ssrc;
190 receive_ssrc_set_ = true;
191 UpdateState();
192 }
193
OnTransportChannelClosed()194 void RtpDataChannel::OnTransportChannelClosed() {
195 RTCError error = RTCError(RTCErrorType::OPERATION_ERROR_WITH_DATA,
196 "Transport channel closed");
197 CloseAbruptlyWithError(std::move(error));
198 }
199
GetStats() const200 DataChannelStats RtpDataChannel::GetStats() const {
201 RTC_DCHECK_RUN_ON(signaling_thread_);
202 DataChannelStats stats{internal_id_, id(), label(),
203 protocol(), state(), messages_sent(),
204 messages_received(), bytes_sent(), bytes_received()};
205 return stats;
206 }
207
208 // The remote peer request that this channel shall be closed.
RemotePeerRequestClose()209 void RtpDataChannel::RemotePeerRequestClose() {
210 // Close with error code explicitly set to OK.
211 CloseAbruptlyWithError(RTCError());
212 }
213
SetSendSsrc(uint32_t send_ssrc)214 void RtpDataChannel::SetSendSsrc(uint32_t send_ssrc) {
215 RTC_DCHECK_RUN_ON(signaling_thread_);
216 if (send_ssrc_set_) {
217 return;
218 }
219 send_ssrc_ = send_ssrc;
220 send_ssrc_set_ = true;
221 UpdateState();
222 }
223
OnDataReceived(const cricket::ReceiveDataParams & params,const rtc::CopyOnWriteBuffer & payload)224 void RtpDataChannel::OnDataReceived(const cricket::ReceiveDataParams& params,
225 const rtc::CopyOnWriteBuffer& payload) {
226 RTC_DCHECK_RUN_ON(signaling_thread_);
227 if (params.ssrc != receive_ssrc_) {
228 return;
229 }
230
231 RTC_DCHECK(params.type == cricket::DMT_BINARY ||
232 params.type == cricket::DMT_TEXT);
233
234 RTC_LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
235 << params.sid;
236
237 bool binary = (params.type == cricket::DMT_BINARY);
238 auto buffer = std::make_unique<DataBuffer>(payload, binary);
239 if (state_ == kOpen && observer_) {
240 ++messages_received_;
241 bytes_received_ += buffer->size();
242 observer_->OnMessage(*buffer.get());
243 } else {
244 if (queued_received_data_.byte_count() + payload.size() >
245 kMaxQueuedReceivedDataBytes) {
246 RTC_LOG(LS_ERROR) << "Queued received data exceeds the max buffer size.";
247
248 queued_received_data_.Clear();
249 CloseAbruptlyWithError(
250 RTCError(RTCErrorType::RESOURCE_EXHAUSTED,
251 "Queued received data exceeds the max buffer size."));
252
253 return;
254 }
255 queued_received_data_.PushBack(std::move(buffer));
256 }
257 }
258
OnChannelReady(bool writable)259 void RtpDataChannel::OnChannelReady(bool writable) {
260 RTC_DCHECK_RUN_ON(signaling_thread_);
261
262 writable_ = writable;
263 if (!writable) {
264 return;
265 }
266
267 UpdateState();
268 }
269
CloseAbruptlyWithError(RTCError error)270 void RtpDataChannel::CloseAbruptlyWithError(RTCError error) {
271 RTC_DCHECK_RUN_ON(signaling_thread_);
272
273 if (state_ == kClosed) {
274 return;
275 }
276
277 if (connected_to_provider_) {
278 DisconnectFromProvider();
279 }
280
281 // Still go to "kClosing" before "kClosed", since observers may be expecting
282 // that.
283 SetState(kClosing);
284 error_ = std::move(error);
285 SetState(kClosed);
286 }
287
UpdateState()288 void RtpDataChannel::UpdateState() {
289 RTC_DCHECK_RUN_ON(signaling_thread_);
290 // UpdateState determines what to do from a few state variables. Include
291 // all conditions required for each state transition here for
292 // clarity.
293 switch (state_) {
294 case kConnecting: {
295 if (send_ssrc_set_ == receive_ssrc_set_) {
296 if (!connected_to_provider_) {
297 connected_to_provider_ = provider_->ConnectDataChannel(this);
298 }
299 if (connected_to_provider_ && writable_) {
300 SetState(kOpen);
301 // If we have received buffers before the channel got writable.
302 // Deliver them now.
303 DeliverQueuedReceivedData();
304 }
305 }
306 break;
307 }
308 case kOpen: {
309 break;
310 }
311 case kClosing: {
312 // For RTP data channels, we can go to "closed" after we finish
313 // sending data and the send/recv SSRCs are unset.
314 if (connected_to_provider_) {
315 DisconnectFromProvider();
316 }
317 if (!send_ssrc_set_ && !receive_ssrc_set_) {
318 SetState(kClosed);
319 }
320 break;
321 }
322 case kClosed:
323 break;
324 }
325 }
326
SetState(DataState state)327 void RtpDataChannel::SetState(DataState state) {
328 RTC_DCHECK_RUN_ON(signaling_thread_);
329 if (state_ == state) {
330 return;
331 }
332
333 state_ = state;
334 if (observer_) {
335 observer_->OnStateChange();
336 }
337 if (state_ == kOpen) {
338 SignalOpened(this);
339 } else if (state_ == kClosed) {
340 SignalClosed(this);
341 }
342 }
343
DisconnectFromProvider()344 void RtpDataChannel::DisconnectFromProvider() {
345 RTC_DCHECK_RUN_ON(signaling_thread_);
346 if (!connected_to_provider_)
347 return;
348
349 provider_->DisconnectDataChannel(this);
350 connected_to_provider_ = false;
351 }
352
DeliverQueuedReceivedData()353 void RtpDataChannel::DeliverQueuedReceivedData() {
354 RTC_DCHECK_RUN_ON(signaling_thread_);
355 if (!observer_) {
356 return;
357 }
358
359 while (!queued_received_data_.Empty()) {
360 std::unique_ptr<DataBuffer> buffer = queued_received_data_.PopFront();
361 ++messages_received_;
362 bytes_received_ += buffer->size();
363 observer_->OnMessage(*buffer);
364 }
365 }
366
SendDataMessage(const DataBuffer & buffer)367 bool RtpDataChannel::SendDataMessage(const DataBuffer& buffer) {
368 RTC_DCHECK_RUN_ON(signaling_thread_);
369 cricket::SendDataParams send_params;
370
371 send_params.ssrc = send_ssrc_;
372 send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
373
374 cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
375 bool success = provider_->SendData(send_params, buffer.data, &send_result);
376
377 if (success) {
378 ++messages_sent_;
379 bytes_sent_ += buffer.size();
380 if (observer_ && buffer.size() > 0) {
381 observer_->OnBufferedAmountChange(buffer.size());
382 }
383 return true;
384 }
385
386 return false;
387 }
388
389 // static
ResetInternalIdAllocatorForTesting(int new_value)390 void RtpDataChannel::ResetInternalIdAllocatorForTesting(int new_value) {
391 g_unique_id = new_value;
392 }
393
394 } // namespace webrtc
395