• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "media/cast/net/pacing/paced_sender.h"
6 
7 #include "base/big_endian.h"
8 #include "base/bind.h"
9 #include "base/debug/dump_without_crashing.h"
10 #include "base/message_loop/message_loop.h"
11 #include "media/cast/logging/logging_impl.h"
12 
13 namespace media {
14 namespace cast {
15 
16 namespace {
17 
18 static const int64 kPacingIntervalMs = 10;
19 // Each frame will be split into no more than kPacingMaxBurstsPerFrame
20 // bursts of packets.
21 static const size_t kPacingMaxBurstsPerFrame = 3;
22 static const size_t kMaxDedupeWindowMs = 500;
23 
24 // "Impossible" upper-bound on the maximum number of packets that should ever be
25 // enqueued in the pacer.  This is used to detect bugs, reported as crash dumps.
26 static const size_t kHugeQueueLengthSeconds = 10;
27 static const size_t kRidiculousNumberOfPackets =
28     kHugeQueueLengthSeconds * (kMaxBurstSize * 1000 / kPacingIntervalMs);
29 
30 }  // namespace
31 
DedupInfo()32 DedupInfo::DedupInfo() : last_byte_acked_for_audio(0) {}
33 
34 // static
MakePacketKey(const base::TimeTicks & ticks,uint32 ssrc,uint16 packet_id)35 PacketKey PacedPacketSender::MakePacketKey(const base::TimeTicks& ticks,
36                                            uint32 ssrc,
37                                            uint16 packet_id) {
38   return std::make_pair(ticks, std::make_pair(ssrc, packet_id));
39 }
40 
PacketSendRecord()41 PacedSender::PacketSendRecord::PacketSendRecord()
42     : last_byte_sent(0), last_byte_sent_for_audio(0) {}
43 
PacedSender(size_t target_burst_size,size_t max_burst_size,base::TickClock * clock,LoggingImpl * logging,PacketSender * transport,const scoped_refptr<base::SingleThreadTaskRunner> & transport_task_runner)44 PacedSender::PacedSender(
45     size_t target_burst_size,
46     size_t max_burst_size,
47     base::TickClock* clock,
48     LoggingImpl* logging,
49     PacketSender* transport,
50     const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner)
51     : clock_(clock),
52       logging_(logging),
53       transport_(transport),
54       transport_task_runner_(transport_task_runner),
55       audio_ssrc_(0),
56       video_ssrc_(0),
57       target_burst_size_(target_burst_size),
58       max_burst_size_(max_burst_size),
59       current_max_burst_size_(target_burst_size_),
60       next_max_burst_size_(target_burst_size_),
61       next_next_max_burst_size_(target_burst_size_),
62       current_burst_size_(0),
63       state_(State_Unblocked),
64       has_reached_upper_bound_once_(false),
65       weak_factory_(this) {
66 }
67 
~PacedSender()68 PacedSender::~PacedSender() {}
69 
RegisterAudioSsrc(uint32 audio_ssrc)70 void PacedSender::RegisterAudioSsrc(uint32 audio_ssrc) {
71   audio_ssrc_ = audio_ssrc;
72 }
73 
RegisterVideoSsrc(uint32 video_ssrc)74 void PacedSender::RegisterVideoSsrc(uint32 video_ssrc) {
75   video_ssrc_ = video_ssrc;
76 }
77 
RegisterPrioritySsrc(uint32 ssrc)78 void PacedSender::RegisterPrioritySsrc(uint32 ssrc) {
79   priority_ssrcs_.push_back(ssrc);
80 }
81 
GetLastByteSentForPacket(const PacketKey & packet_key)82 int64 PacedSender::GetLastByteSentForPacket(const PacketKey& packet_key) {
83   PacketSendHistory::const_iterator it = send_history_.find(packet_key);
84   if (it == send_history_.end())
85     return 0;
86   return it->second.last_byte_sent;
87 }
88 
GetLastByteSentForSsrc(uint32 ssrc)89 int64 PacedSender::GetLastByteSentForSsrc(uint32 ssrc) {
90   std::map<uint32, int64>::const_iterator it = last_byte_sent_.find(ssrc);
91   if (it == last_byte_sent_.end())
92     return 0;
93   return it->second;
94 }
95 
SendPackets(const SendPacketVector & packets)96 bool PacedSender::SendPackets(const SendPacketVector& packets) {
97   if (packets.empty()) {
98     return true;
99   }
100   const bool high_priority = IsHighPriority(packets.begin()->first);
101   for (size_t i = 0; i < packets.size(); i++) {
102     DCHECK(IsHighPriority(packets[i].first) == high_priority);
103     if (high_priority) {
104       priority_packet_list_[packets[i].first] =
105           make_pair(PacketType_Normal, packets[i].second);
106     } else {
107       packet_list_[packets[i].first] =
108           make_pair(PacketType_Normal, packets[i].second);
109     }
110   }
111   if (state_ == State_Unblocked) {
112     SendStoredPackets();
113   }
114   return true;
115 }
116 
ShouldResend(const PacketKey & packet_key,const DedupInfo & dedup_info,const base::TimeTicks & now)117 bool PacedSender::ShouldResend(const PacketKey& packet_key,
118                                const DedupInfo& dedup_info,
119                                const base::TimeTicks& now) {
120   PacketSendHistory::const_iterator it = send_history_.find(packet_key);
121 
122   // No history of previous transmission. It might be sent too long ago.
123   if (it == send_history_.end())
124     return true;
125 
126   // Suppose there is request to retransmit X and there is an audio
127   // packet Y sent just before X. Reject retransmission of X if ACK for
128   // Y has not been received.
129   // Only do this for video packets.
130   if (packet_key.second.first == video_ssrc_) {
131     if (dedup_info.last_byte_acked_for_audio &&
132         it->second.last_byte_sent_for_audio &&
133         dedup_info.last_byte_acked_for_audio <
134         it->second.last_byte_sent_for_audio) {
135       return false;
136     }
137   }
138   // Retransmission interval has to be greater than |resend_interval|.
139   if (now - it->second.time < dedup_info.resend_interval)
140     return false;
141   return true;
142 }
143 
ResendPackets(const SendPacketVector & packets,const DedupInfo & dedup_info)144 bool PacedSender::ResendPackets(const SendPacketVector& packets,
145                                 const DedupInfo& dedup_info) {
146   if (packets.empty()) {
147     return true;
148   }
149   const bool high_priority = IsHighPriority(packets.begin()->first);
150   const base::TimeTicks now = clock_->NowTicks();
151   for (size_t i = 0; i < packets.size(); i++) {
152     if (!ShouldResend(packets[i].first, dedup_info, now)) {
153       LogPacketEvent(packets[i].second->data, PACKET_RTX_REJECTED);
154       continue;
155     }
156 
157     DCHECK(IsHighPriority(packets[i].first) == high_priority);
158     if (high_priority) {
159       priority_packet_list_[packets[i].first] =
160           make_pair(PacketType_Resend, packets[i].second);
161     } else {
162       packet_list_[packets[i].first] =
163           make_pair(PacketType_Resend, packets[i].second);
164     }
165   }
166   if (state_ == State_Unblocked) {
167     SendStoredPackets();
168   }
169   return true;
170 }
171 
SendRtcpPacket(uint32 ssrc,PacketRef packet)172 bool PacedSender::SendRtcpPacket(uint32 ssrc, PacketRef packet) {
173   if (state_ == State_TransportBlocked) {
174     priority_packet_list_[
175         PacedPacketSender::MakePacketKey(base::TimeTicks(), ssrc, 0)] =
176         make_pair(PacketType_RTCP, packet);
177   } else {
178     // We pass the RTCP packets straight through.
179     if (!transport_->SendPacket(
180             packet,
181             base::Bind(&PacedSender::SendStoredPackets,
182                        weak_factory_.GetWeakPtr()))) {
183       state_ = State_TransportBlocked;
184     }
185   }
186   return true;
187 }
188 
CancelSendingPacket(const PacketKey & packet_key)189 void PacedSender::CancelSendingPacket(const PacketKey& packet_key) {
190   packet_list_.erase(packet_key);
191   priority_packet_list_.erase(packet_key);
192 }
193 
PopNextPacket(PacketType * packet_type,PacketKey * packet_key)194 PacketRef PacedSender::PopNextPacket(PacketType* packet_type,
195                                      PacketKey* packet_key) {
196   PacketList* list = !priority_packet_list_.empty() ?
197       &priority_packet_list_ : &packet_list_;
198   DCHECK(!list->empty());
199   PacketList::iterator i = list->begin();
200   *packet_type = i->second.first;
201   *packet_key = i->first;
202   PacketRef ret = i->second.second;
203   list->erase(i);
204   return ret;
205 }
206 
IsHighPriority(const PacketKey & packet_key) const207 bool PacedSender::IsHighPriority(const PacketKey& packet_key) const {
208   return std::find(priority_ssrcs_.begin(), priority_ssrcs_.end(),
209                    packet_key.second.first) != priority_ssrcs_.end();
210 }
211 
empty() const212 bool PacedSender::empty() const {
213   return packet_list_.empty() && priority_packet_list_.empty();
214 }
215 
size() const216 size_t PacedSender::size() const {
217   return packet_list_.size() + priority_packet_list_.size();
218 }
219 
220 // This function can be called from three places:
221 // 1. User called one of the Send* functions and we were in an unblocked state.
222 // 2. state_ == State_TransportBlocked and the transport is calling us to
223 //    let us know that it's ok to send again.
224 // 3. state_ == State_BurstFull and there are still packets to send. In this
225 //    case we called PostDelayedTask on this function to start a new burst.
SendStoredPackets()226 void PacedSender::SendStoredPackets() {
227   State previous_state = state_;
228   state_ = State_Unblocked;
229   if (empty()) {
230     return;
231   }
232 
233   // If the queue ever becomes impossibly long, send a crash dump without
234   // actually crashing the process.
235   if (size() > kRidiculousNumberOfPackets && !has_reached_upper_bound_once_) {
236     NOTREACHED();
237     // Please use Cr=Internals-Cast label in bug reports:
238     base::debug::DumpWithoutCrashing();
239     has_reached_upper_bound_once_ = true;
240   }
241 
242   base::TimeTicks now = clock_->NowTicks();
243   // I don't actually trust that PostDelayTask(x - now) will mean that
244   // now >= x when the call happens, so check if the previous state was
245   // State_BurstFull too.
246   if (now >= burst_end_ || previous_state == State_BurstFull) {
247     // Start a new burst.
248     current_burst_size_ = 0;
249     burst_end_ = now + base::TimeDelta::FromMilliseconds(kPacingIntervalMs);
250 
251     // The goal here is to try to send out the queued packets over the next
252     // three bursts, while trying to keep the burst size below 10 if possible.
253     // We have some evidence that sending more than 12 packets in a row doesn't
254     // work very well, but we don't actually know why yet. Sending out packets
255     // sooner is better than sending out packets later as that gives us more
256     // time to re-send them if needed. So if we have less than 30 packets, just
257     // send 10 at a time. If we have less than 60 packets, send n / 3 at a time.
258     // if we have more than 60, we send 20 at a time. 20 packets is ~24Mbit/s
259     // which is more bandwidth than the cast library should need, and sending
260     // out more data per second is unlikely to be helpful.
261     size_t max_burst_size = std::min(
262         max_burst_size_,
263         std::max(target_burst_size_, size() / kPacingMaxBurstsPerFrame));
264     current_max_burst_size_ = std::max(next_max_burst_size_, max_burst_size);
265     next_max_burst_size_ = std::max(next_next_max_burst_size_, max_burst_size);
266     next_next_max_burst_size_ = max_burst_size;
267   }
268 
269   base::Closure cb = base::Bind(&PacedSender::SendStoredPackets,
270                                 weak_factory_.GetWeakPtr());
271   while (!empty()) {
272     if (current_burst_size_ >= current_max_burst_size_) {
273       transport_task_runner_->PostDelayedTask(FROM_HERE,
274                                               cb,
275                                               burst_end_ - now);
276       state_ = State_BurstFull;
277       return;
278     }
279     PacketType packet_type;
280     PacketKey packet_key;
281     PacketRef packet = PopNextPacket(&packet_type, &packet_key);
282     PacketSendRecord send_record;
283     send_record.time = now;
284 
285     switch (packet_type) {
286       case PacketType_Resend:
287         LogPacketEvent(packet->data, PACKET_RETRANSMITTED);
288         break;
289       case PacketType_Normal:
290         LogPacketEvent(packet->data, PACKET_SENT_TO_NETWORK);
291         break;
292       case PacketType_RTCP:
293         break;
294     }
295 
296     const bool socket_blocked = !transport_->SendPacket(packet, cb);
297 
298     // Save the send record.
299     send_record.last_byte_sent = transport_->GetBytesSent();
300     send_record.last_byte_sent_for_audio = GetLastByteSentForSsrc(audio_ssrc_);
301     send_history_[packet_key] = send_record;
302     send_history_buffer_[packet_key] = send_record;
303     last_byte_sent_[packet_key.second.first] = send_record.last_byte_sent;
304 
305     if (socket_blocked) {
306       state_ = State_TransportBlocked;
307       return;
308     }
309     current_burst_size_++;
310   }
311 
312   // Keep ~0.5 seconds of data (1000 packets).
313   if (send_history_buffer_.size() >=
314       max_burst_size_ * kMaxDedupeWindowMs / kPacingIntervalMs) {
315     send_history_.swap(send_history_buffer_);
316     send_history_buffer_.clear();
317   }
318   DCHECK_LE(send_history_buffer_.size(),
319             max_burst_size_ * kMaxDedupeWindowMs / kPacingIntervalMs);
320   state_ = State_Unblocked;
321 }
322 
LogPacketEvent(const Packet & packet,CastLoggingEvent event)323 void PacedSender::LogPacketEvent(const Packet& packet, CastLoggingEvent event) {
324   // Get SSRC from packet and compare with the audio_ssrc / video_ssrc to see
325   // if the packet is audio or video.
326   DCHECK_GE(packet.size(), 12u);
327   base::BigEndianReader reader(reinterpret_cast<const char*>(&packet[8]), 4);
328   uint32 ssrc;
329   bool success = reader.ReadU32(&ssrc);
330   DCHECK(success);
331   bool is_audio;
332   if (ssrc == audio_ssrc_) {
333     is_audio = true;
334   } else if (ssrc == video_ssrc_) {
335     is_audio = false;
336   } else {
337     DVLOG(3) << "Got unknown ssrc " << ssrc << " when logging packet event";
338     return;
339   }
340 
341   EventMediaType media_type = is_audio ? AUDIO_EVENT : VIDEO_EVENT;
342   logging_->InsertSinglePacketEvent(clock_->NowTicks(), event, media_type,
343       packet);
344 }
345 
346 }  // namespace cast
347 }  // namespace media
348