• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2020 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "pc/rtp_data_channel.h"
12 
13 #include <memory>
14 #include <string>
15 #include <utility>
16 
17 #include "api/proxy.h"
18 #include "rtc_base/checks.h"
19 #include "rtc_base/location.h"
20 #include "rtc_base/logging.h"
21 #include "rtc_base/ref_counted_object.h"
22 #include "rtc_base/thread.h"
23 
24 namespace webrtc {
25 
26 namespace {
27 
28 static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024;
29 
30 static std::atomic<int> g_unique_id{0};
31 
GenerateUniqueId()32 int GenerateUniqueId() {
33   return ++g_unique_id;
34 }
35 
36 // Define proxy for DataChannelInterface.
37 BEGIN_SIGNALING_PROXY_MAP(DataChannel)
38 PROXY_SIGNALING_THREAD_DESTRUCTOR()
39 PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
40 PROXY_METHOD0(void, UnregisterObserver)
41 BYPASS_PROXY_CONSTMETHOD0(std::string, label)
42 BYPASS_PROXY_CONSTMETHOD0(bool, reliable)
43 BYPASS_PROXY_CONSTMETHOD0(bool, ordered)
44 BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmitTime)
45 BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmits)
46 BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxRetransmitsOpt)
47 BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxPacketLifeTime)
48 BYPASS_PROXY_CONSTMETHOD0(std::string, protocol)
49 BYPASS_PROXY_CONSTMETHOD0(bool, negotiated)
50 // Can't bypass the proxy since the id may change.
51 PROXY_CONSTMETHOD0(int, id)
52 BYPASS_PROXY_CONSTMETHOD0(Priority, priority)
53 PROXY_CONSTMETHOD0(DataState, state)
54 PROXY_CONSTMETHOD0(RTCError, error)
55 PROXY_CONSTMETHOD0(uint32_t, messages_sent)
56 PROXY_CONSTMETHOD0(uint64_t, bytes_sent)
57 PROXY_CONSTMETHOD0(uint32_t, messages_received)
58 PROXY_CONSTMETHOD0(uint64_t, bytes_received)
59 PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
60 PROXY_METHOD0(void, Close)
61 // TODO(bugs.webrtc.org/11547): Change to run on the network thread.
62 PROXY_METHOD1(bool, Send, const DataBuffer&)
63 END_PROXY_MAP()
64 
65 }  // namespace
66 
Create(RtpDataChannelProviderInterface * provider,const std::string & label,const DataChannelInit & config,rtc::Thread * signaling_thread)67 rtc::scoped_refptr<RtpDataChannel> RtpDataChannel::Create(
68     RtpDataChannelProviderInterface* provider,
69     const std::string& label,
70     const DataChannelInit& config,
71     rtc::Thread* signaling_thread) {
72   rtc::scoped_refptr<RtpDataChannel> channel(
73       new rtc::RefCountedObject<RtpDataChannel>(config, provider, label,
74                                                 signaling_thread));
75   if (!channel->Init()) {
76     return nullptr;
77   }
78   return channel;
79 }
80 
81 // static
CreateProxy(rtc::scoped_refptr<RtpDataChannel> channel)82 rtc::scoped_refptr<DataChannelInterface> RtpDataChannel::CreateProxy(
83     rtc::scoped_refptr<RtpDataChannel> channel) {
84   return DataChannelProxy::Create(channel->signaling_thread_, channel.get());
85 }
86 
RtpDataChannel(const DataChannelInit & config,RtpDataChannelProviderInterface * provider,const std::string & label,rtc::Thread * signaling_thread)87 RtpDataChannel::RtpDataChannel(const DataChannelInit& config,
88                                RtpDataChannelProviderInterface* provider,
89                                const std::string& label,
90                                rtc::Thread* signaling_thread)
91     : signaling_thread_(signaling_thread),
92       internal_id_(GenerateUniqueId()),
93       label_(label),
94       config_(config),
95       provider_(provider) {
96   RTC_DCHECK_RUN_ON(signaling_thread_);
97 }
98 
Init()99 bool RtpDataChannel::Init() {
100   RTC_DCHECK_RUN_ON(signaling_thread_);
101   if (config_.reliable || config_.id != -1 || config_.maxRetransmits ||
102       config_.maxRetransmitTime) {
103     RTC_LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
104                          "invalid DataChannelInit.";
105     return false;
106   }
107 
108   return true;
109 }
110 
~RtpDataChannel()111 RtpDataChannel::~RtpDataChannel() {
112   RTC_DCHECK_RUN_ON(signaling_thread_);
113 }
114 
RegisterObserver(DataChannelObserver * observer)115 void RtpDataChannel::RegisterObserver(DataChannelObserver* observer) {
116   RTC_DCHECK_RUN_ON(signaling_thread_);
117   observer_ = observer;
118   DeliverQueuedReceivedData();
119 }
120 
UnregisterObserver()121 void RtpDataChannel::UnregisterObserver() {
122   RTC_DCHECK_RUN_ON(signaling_thread_);
123   observer_ = nullptr;
124 }
125 
Close()126 void RtpDataChannel::Close() {
127   RTC_DCHECK_RUN_ON(signaling_thread_);
128   if (state_ == kClosed)
129     return;
130   send_ssrc_ = 0;
131   send_ssrc_set_ = false;
132   SetState(kClosing);
133   UpdateState();
134 }
135 
state() const136 RtpDataChannel::DataState RtpDataChannel::state() const {
137   RTC_DCHECK_RUN_ON(signaling_thread_);
138   return state_;
139 }
140 
error() const141 RTCError RtpDataChannel::error() const {
142   RTC_DCHECK_RUN_ON(signaling_thread_);
143   return error_;
144 }
145 
messages_sent() const146 uint32_t RtpDataChannel::messages_sent() const {
147   RTC_DCHECK_RUN_ON(signaling_thread_);
148   return messages_sent_;
149 }
150 
bytes_sent() const151 uint64_t RtpDataChannel::bytes_sent() const {
152   RTC_DCHECK_RUN_ON(signaling_thread_);
153   return bytes_sent_;
154 }
155 
messages_received() const156 uint32_t RtpDataChannel::messages_received() const {
157   RTC_DCHECK_RUN_ON(signaling_thread_);
158   return messages_received_;
159 }
160 
bytes_received() const161 uint64_t RtpDataChannel::bytes_received() const {
162   RTC_DCHECK_RUN_ON(signaling_thread_);
163   return bytes_received_;
164 }
165 
Send(const DataBuffer & buffer)166 bool RtpDataChannel::Send(const DataBuffer& buffer) {
167   RTC_DCHECK_RUN_ON(signaling_thread_);
168 
169   if (state_ != kOpen) {
170     return false;
171   }
172 
173   // TODO(jiayl): the spec is unclear about if the remote side should get the
174   // onmessage event. We need to figure out the expected behavior and change the
175   // code accordingly.
176   if (buffer.size() == 0) {
177     return true;
178   }
179 
180   return SendDataMessage(buffer);
181 }
182 
SetReceiveSsrc(uint32_t receive_ssrc)183 void RtpDataChannel::SetReceiveSsrc(uint32_t receive_ssrc) {
184   RTC_DCHECK_RUN_ON(signaling_thread_);
185 
186   if (receive_ssrc_set_) {
187     return;
188   }
189   receive_ssrc_ = receive_ssrc;
190   receive_ssrc_set_ = true;
191   UpdateState();
192 }
193 
OnTransportChannelClosed()194 void RtpDataChannel::OnTransportChannelClosed() {
195   RTCError error = RTCError(RTCErrorType::OPERATION_ERROR_WITH_DATA,
196                             "Transport channel closed");
197   CloseAbruptlyWithError(std::move(error));
198 }
199 
GetStats() const200 DataChannelStats RtpDataChannel::GetStats() const {
201   RTC_DCHECK_RUN_ON(signaling_thread_);
202   DataChannelStats stats{internal_id_,        id(),         label(),
203                          protocol(),          state(),      messages_sent(),
204                          messages_received(), bytes_sent(), bytes_received()};
205   return stats;
206 }
207 
208 // The remote peer request that this channel shall be closed.
RemotePeerRequestClose()209 void RtpDataChannel::RemotePeerRequestClose() {
210   // Close with error code explicitly set to OK.
211   CloseAbruptlyWithError(RTCError());
212 }
213 
SetSendSsrc(uint32_t send_ssrc)214 void RtpDataChannel::SetSendSsrc(uint32_t send_ssrc) {
215   RTC_DCHECK_RUN_ON(signaling_thread_);
216   if (send_ssrc_set_) {
217     return;
218   }
219   send_ssrc_ = send_ssrc;
220   send_ssrc_set_ = true;
221   UpdateState();
222 }
223 
OnDataReceived(const cricket::ReceiveDataParams & params,const rtc::CopyOnWriteBuffer & payload)224 void RtpDataChannel::OnDataReceived(const cricket::ReceiveDataParams& params,
225                                     const rtc::CopyOnWriteBuffer& payload) {
226   RTC_DCHECK_RUN_ON(signaling_thread_);
227   if (params.ssrc != receive_ssrc_) {
228     return;
229   }
230 
231   RTC_DCHECK(params.type == cricket::DMT_BINARY ||
232              params.type == cricket::DMT_TEXT);
233 
234   RTC_LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
235                       << params.sid;
236 
237   bool binary = (params.type == cricket::DMT_BINARY);
238   auto buffer = std::make_unique<DataBuffer>(payload, binary);
239   if (state_ == kOpen && observer_) {
240     ++messages_received_;
241     bytes_received_ += buffer->size();
242     observer_->OnMessage(*buffer.get());
243   } else {
244     if (queued_received_data_.byte_count() + payload.size() >
245         kMaxQueuedReceivedDataBytes) {
246       RTC_LOG(LS_ERROR) << "Queued received data exceeds the max buffer size.";
247 
248       queued_received_data_.Clear();
249       CloseAbruptlyWithError(
250           RTCError(RTCErrorType::RESOURCE_EXHAUSTED,
251                    "Queued received data exceeds the max buffer size."));
252 
253       return;
254     }
255     queued_received_data_.PushBack(std::move(buffer));
256   }
257 }
258 
OnChannelReady(bool writable)259 void RtpDataChannel::OnChannelReady(bool writable) {
260   RTC_DCHECK_RUN_ON(signaling_thread_);
261 
262   writable_ = writable;
263   if (!writable) {
264     return;
265   }
266 
267   UpdateState();
268 }
269 
CloseAbruptlyWithError(RTCError error)270 void RtpDataChannel::CloseAbruptlyWithError(RTCError error) {
271   RTC_DCHECK_RUN_ON(signaling_thread_);
272 
273   if (state_ == kClosed) {
274     return;
275   }
276 
277   if (connected_to_provider_) {
278     DisconnectFromProvider();
279   }
280 
281   // Still go to "kClosing" before "kClosed", since observers may be expecting
282   // that.
283   SetState(kClosing);
284   error_ = std::move(error);
285   SetState(kClosed);
286 }
287 
UpdateState()288 void RtpDataChannel::UpdateState() {
289   RTC_DCHECK_RUN_ON(signaling_thread_);
290   // UpdateState determines what to do from a few state variables.  Include
291   // all conditions required for each state transition here for
292   // clarity.
293   switch (state_) {
294     case kConnecting: {
295       if (send_ssrc_set_ == receive_ssrc_set_) {
296         if (!connected_to_provider_) {
297           connected_to_provider_ = provider_->ConnectDataChannel(this);
298         }
299         if (connected_to_provider_ && writable_) {
300           SetState(kOpen);
301           // If we have received buffers before the channel got writable.
302           // Deliver them now.
303           DeliverQueuedReceivedData();
304         }
305       }
306       break;
307     }
308     case kOpen: {
309       break;
310     }
311     case kClosing: {
312       // For RTP data channels, we can go to "closed" after we finish
313       // sending data and the send/recv SSRCs are unset.
314       if (connected_to_provider_) {
315         DisconnectFromProvider();
316       }
317       if (!send_ssrc_set_ && !receive_ssrc_set_) {
318         SetState(kClosed);
319       }
320       break;
321     }
322     case kClosed:
323       break;
324   }
325 }
326 
SetState(DataState state)327 void RtpDataChannel::SetState(DataState state) {
328   RTC_DCHECK_RUN_ON(signaling_thread_);
329   if (state_ == state) {
330     return;
331   }
332 
333   state_ = state;
334   if (observer_) {
335     observer_->OnStateChange();
336   }
337   if (state_ == kOpen) {
338     SignalOpened(this);
339   } else if (state_ == kClosed) {
340     SignalClosed(this);
341   }
342 }
343 
DisconnectFromProvider()344 void RtpDataChannel::DisconnectFromProvider() {
345   RTC_DCHECK_RUN_ON(signaling_thread_);
346   if (!connected_to_provider_)
347     return;
348 
349   provider_->DisconnectDataChannel(this);
350   connected_to_provider_ = false;
351 }
352 
DeliverQueuedReceivedData()353 void RtpDataChannel::DeliverQueuedReceivedData() {
354   RTC_DCHECK_RUN_ON(signaling_thread_);
355   if (!observer_) {
356     return;
357   }
358 
359   while (!queued_received_data_.Empty()) {
360     std::unique_ptr<DataBuffer> buffer = queued_received_data_.PopFront();
361     ++messages_received_;
362     bytes_received_ += buffer->size();
363     observer_->OnMessage(*buffer);
364   }
365 }
366 
SendDataMessage(const DataBuffer & buffer)367 bool RtpDataChannel::SendDataMessage(const DataBuffer& buffer) {
368   RTC_DCHECK_RUN_ON(signaling_thread_);
369   cricket::SendDataParams send_params;
370 
371   send_params.ssrc = send_ssrc_;
372   send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
373 
374   cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
375   bool success = provider_->SendData(send_params, buffer.data, &send_result);
376 
377   if (success) {
378     ++messages_sent_;
379     bytes_sent_ += buffer.size();
380     if (observer_ && buffer.size() > 0) {
381       observer_->OnBufferedAmountChange(buffer.size());
382     }
383     return true;
384   }
385 
386   return false;
387 }
388 
389 // static
ResetInternalIdAllocatorForTesting(int new_value)390 void RtpDataChannel::ResetInternalIdAllocatorForTesting(int new_value) {
391   g_unique_id = new_value;
392 }
393 
394 }  // namespace webrtc
395