1 /*
2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.h"
12
13 #include <math.h>
14
15 #include <algorithm>
16
17 #include "webrtc/base/constructormagic.h"
18 #include "webrtc/base/logging.h"
19 #include "webrtc/base/scoped_ptr.h"
20 #include "webrtc/base/thread_annotations.h"
21 #include "webrtc/modules/pacing/paced_sender.h"
22 #include "webrtc/modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
23 #include "webrtc/system_wrappers/include/clock.h"
24 #include "webrtc/system_wrappers/include/critical_section_wrapper.h"
25 #include "webrtc/typedefs.h"
26
27 namespace webrtc {
28
29 enum {
30 kTimestampGroupLengthMs = 5,
31 kAbsSendTimeFraction = 18,
32 kAbsSendTimeInterArrivalUpshift = 8,
33 kInterArrivalShift = kAbsSendTimeFraction + kAbsSendTimeInterArrivalUpshift,
34 kInitialProbingIntervalMs = 2000,
35 kMinClusterSize = 4,
36 kMaxProbePackets = 15,
37 kExpectedNumberOfProbes = 3
38 };
39
40 static const size_t kPropagationDeltaQueueMaxSize = 1000;
41 static const int64_t kPropagationDeltaQueueMaxTimeMs = 1000;
42 static const double kTimestampToMs = 1000.0 /
43 static_cast<double>(1 << kInterArrivalShift);
44
45 // Removes the entries at |index| of |time| and |value|, if time[index] is
46 // smaller than or equal to |deadline|. |time| must be sorted ascendingly.
RemoveStaleEntries(std::vector<int64_t> * time,std::vector<int> * value,int64_t deadline)47 static void RemoveStaleEntries(
48 std::vector<int64_t>* time, std::vector<int>* value, int64_t deadline) {
49 assert(time->size() == value->size());
50 std::vector<int64_t>::iterator end_of_removal = std::upper_bound(
51 time->begin(), time->end(), deadline);
52 size_t end_of_removal_index = end_of_removal - time->begin();
53
54 time->erase(time->begin(), end_of_removal);
55 value->erase(value->begin(), value->begin() + end_of_removal_index);
56 }
57
58 template<typename K, typename V>
Keys(const std::map<K,V> & map)59 std::vector<K> Keys(const std::map<K, V>& map) {
60 std::vector<K> keys;
61 keys.reserve(map.size());
62 for (typename std::map<K, V>::const_iterator it = map.begin();
63 it != map.end(); ++it) {
64 keys.push_back(it->first);
65 }
66 return keys;
67 }
68
ConvertMsTo24Bits(int64_t time_ms)69 uint32_t ConvertMsTo24Bits(int64_t time_ms) {
70 uint32_t time_24_bits =
71 static_cast<uint32_t>(
72 ((static_cast<uint64_t>(time_ms) << kAbsSendTimeFraction) + 500) /
73 1000) &
74 0x00FFFFFF;
75 return time_24_bits;
76 }
77
IsWithinClusterBounds(int send_delta_ms,const Cluster & cluster_aggregate)78 bool RemoteBitrateEstimatorAbsSendTime::IsWithinClusterBounds(
79 int send_delta_ms,
80 const Cluster& cluster_aggregate) {
81 if (cluster_aggregate.count == 0)
82 return true;
83 float cluster_mean = cluster_aggregate.send_mean_ms /
84 static_cast<float>(cluster_aggregate.count);
85 return fabs(static_cast<float>(send_delta_ms) - cluster_mean) < 2.5f;
86 }
87
AddCluster(std::list<Cluster> * clusters,Cluster * cluster)88 void RemoteBitrateEstimatorAbsSendTime::AddCluster(
89 std::list<Cluster>* clusters,
90 Cluster* cluster) {
91 cluster->send_mean_ms /= static_cast<float>(cluster->count);
92 cluster->recv_mean_ms /= static_cast<float>(cluster->count);
93 cluster->mean_size /= cluster->count;
94 clusters->push_back(*cluster);
95 }
96
Id() const97 int RemoteBitrateEstimatorAbsSendTime::Id() const {
98 return static_cast<int>(reinterpret_cast<uint64_t>(this));
99 }
100
RemoteBitrateEstimatorAbsSendTime(RemoteBitrateObserver * observer,Clock * clock)101 RemoteBitrateEstimatorAbsSendTime::RemoteBitrateEstimatorAbsSendTime(
102 RemoteBitrateObserver* observer,
103 Clock* clock)
104 : crit_sect_(CriticalSectionWrapper::CreateCriticalSection()),
105 observer_(observer),
106 clock_(clock),
107 ssrcs_(),
108 inter_arrival_(),
109 estimator_(OverUseDetectorOptions()),
110 detector_(OverUseDetectorOptions()),
111 incoming_bitrate_(kBitrateWindowMs, 8000),
112 last_process_time_(-1),
113 process_interval_ms_(kProcessIntervalMs),
114 total_propagation_delta_ms_(0),
115 total_probes_received_(0),
116 first_packet_time_ms_(-1) {
117 assert(observer_);
118 assert(clock_);
119 LOG(LS_INFO) << "RemoteBitrateEstimatorAbsSendTime: Instantiating.";
120 }
121
ComputeClusters(std::list<Cluster> * clusters) const122 void RemoteBitrateEstimatorAbsSendTime::ComputeClusters(
123 std::list<Cluster>* clusters) const {
124 Cluster current;
125 int64_t prev_send_time = -1;
126 int64_t prev_recv_time = -1;
127 for (std::list<Probe>::const_iterator it = probes_.begin();
128 it != probes_.end();
129 ++it) {
130 if (prev_send_time >= 0) {
131 int send_delta_ms = it->send_time_ms - prev_send_time;
132 int recv_delta_ms = it->recv_time_ms - prev_recv_time;
133 if (send_delta_ms >= 1 && recv_delta_ms >= 1) {
134 ++current.num_above_min_delta;
135 }
136 if (!IsWithinClusterBounds(send_delta_ms, current)) {
137 if (current.count >= kMinClusterSize)
138 AddCluster(clusters, ¤t);
139 current = Cluster();
140 }
141 current.send_mean_ms += send_delta_ms;
142 current.recv_mean_ms += recv_delta_ms;
143 current.mean_size += it->payload_size;
144 ++current.count;
145 }
146 prev_send_time = it->send_time_ms;
147 prev_recv_time = it->recv_time_ms;
148 }
149 if (current.count >= kMinClusterSize)
150 AddCluster(clusters, ¤t);
151 }
152
153 std::list<Cluster>::const_iterator
FindBestProbe(const std::list<Cluster> & clusters) const154 RemoteBitrateEstimatorAbsSendTime::FindBestProbe(
155 const std::list<Cluster>& clusters) const {
156 int highest_probe_bitrate_bps = 0;
157 std::list<Cluster>::const_iterator best_it = clusters.end();
158 for (std::list<Cluster>::const_iterator it = clusters.begin();
159 it != clusters.end();
160 ++it) {
161 if (it->send_mean_ms == 0 || it->recv_mean_ms == 0)
162 continue;
163 int send_bitrate_bps = it->mean_size * 8 * 1000 / it->send_mean_ms;
164 int recv_bitrate_bps = it->mean_size * 8 * 1000 / it->recv_mean_ms;
165 if (it->num_above_min_delta > it->count / 2 &&
166 (it->recv_mean_ms - it->send_mean_ms <= 2.0f &&
167 it->send_mean_ms - it->recv_mean_ms <= 5.0f)) {
168 int probe_bitrate_bps =
169 std::min(it->GetSendBitrateBps(), it->GetRecvBitrateBps());
170 if (probe_bitrate_bps > highest_probe_bitrate_bps) {
171 highest_probe_bitrate_bps = probe_bitrate_bps;
172 best_it = it;
173 }
174 } else {
175 LOG(LS_INFO) << "Probe failed, sent at " << send_bitrate_bps
176 << " bps, received at " << recv_bitrate_bps
177 << " bps. Mean send delta: " << it->send_mean_ms
178 << " ms, mean recv delta: " << it->recv_mean_ms
179 << " ms, num probes: " << it->count;
180 break;
181 }
182 }
183 return best_it;
184 }
185
ProcessClusters(int64_t now_ms)186 void RemoteBitrateEstimatorAbsSendTime::ProcessClusters(int64_t now_ms) {
187 std::list<Cluster> clusters;
188 ComputeClusters(&clusters);
189 if (clusters.empty()) {
190 // If we reach the max number of probe packets and still have no clusters,
191 // we will remove the oldest one.
192 if (probes_.size() >= kMaxProbePackets)
193 probes_.pop_front();
194 return;
195 }
196
197 std::list<Cluster>::const_iterator best_it = FindBestProbe(clusters);
198 if (best_it != clusters.end()) {
199 int probe_bitrate_bps =
200 std::min(best_it->GetSendBitrateBps(), best_it->GetRecvBitrateBps());
201 // Make sure that a probe sent on a lower bitrate than our estimate can't
202 // reduce the estimate.
203 if (IsBitrateImproving(probe_bitrate_bps) &&
204 probe_bitrate_bps > static_cast<int>(incoming_bitrate_.Rate(now_ms))) {
205 LOG(LS_INFO) << "Probe successful, sent at "
206 << best_it->GetSendBitrateBps() << " bps, received at "
207 << best_it->GetRecvBitrateBps()
208 << " bps. Mean send delta: " << best_it->send_mean_ms
209 << " ms, mean recv delta: " << best_it->recv_mean_ms
210 << " ms, num probes: " << best_it->count;
211 remote_rate_.SetEstimate(probe_bitrate_bps, now_ms);
212 }
213 }
214
215 // Not probing and received non-probe packet, or finished with current set
216 // of probes.
217 if (clusters.size() >= kExpectedNumberOfProbes)
218 probes_.clear();
219 }
220
IsBitrateImproving(int new_bitrate_bps) const221 bool RemoteBitrateEstimatorAbsSendTime::IsBitrateImproving(
222 int new_bitrate_bps) const {
223 bool initial_probe = !remote_rate_.ValidEstimate() && new_bitrate_bps > 0;
224 bool bitrate_above_estimate =
225 remote_rate_.ValidEstimate() &&
226 new_bitrate_bps > static_cast<int>(remote_rate_.LatestEstimate());
227 return initial_probe || bitrate_above_estimate;
228 }
229
IncomingPacketFeedbackVector(const std::vector<PacketInfo> & packet_feedback_vector)230 void RemoteBitrateEstimatorAbsSendTime::IncomingPacketFeedbackVector(
231 const std::vector<PacketInfo>& packet_feedback_vector) {
232 for (const auto& packet_info : packet_feedback_vector) {
233 IncomingPacketInfo(packet_info.arrival_time_ms,
234 ConvertMsTo24Bits(packet_info.send_time_ms),
235 packet_info.payload_size, 0, packet_info.was_paced);
236 }
237 }
238
IncomingPacket(int64_t arrival_time_ms,size_t payload_size,const RTPHeader & header,bool was_paced)239 void RemoteBitrateEstimatorAbsSendTime::IncomingPacket(int64_t arrival_time_ms,
240 size_t payload_size,
241 const RTPHeader& header,
242 bool was_paced) {
243 if (!header.extension.hasAbsoluteSendTime) {
244 LOG(LS_WARNING) << "RemoteBitrateEstimatorAbsSendTimeImpl: Incoming packet "
245 "is missing absolute send time extension!";
246 return;
247 }
248 IncomingPacketInfo(arrival_time_ms, header.extension.absoluteSendTime,
249 payload_size, header.ssrc, was_paced);
250 }
251
IncomingPacketInfo(int64_t arrival_time_ms,uint32_t send_time_24bits,size_t payload_size,uint32_t ssrc,bool was_paced)252 void RemoteBitrateEstimatorAbsSendTime::IncomingPacketInfo(
253 int64_t arrival_time_ms,
254 uint32_t send_time_24bits,
255 size_t payload_size,
256 uint32_t ssrc,
257 bool was_paced) {
258 assert(send_time_24bits < (1ul << 24));
259 // Shift up send time to use the full 32 bits that inter_arrival works with,
260 // so wrapping works properly.
261 uint32_t timestamp = send_time_24bits << kAbsSendTimeInterArrivalUpshift;
262 int64_t send_time_ms = static_cast<int64_t>(timestamp) * kTimestampToMs;
263
264 CriticalSectionScoped cs(crit_sect_.get());
265 int64_t now_ms = clock_->TimeInMilliseconds();
266 // TODO(holmer): SSRCs are only needed for REMB, should be broken out from
267 // here.
268 ssrcs_[ssrc] = now_ms;
269 incoming_bitrate_.Update(payload_size, now_ms);
270 const BandwidthUsage prior_state = detector_.State();
271
272 if (first_packet_time_ms_ == -1)
273 first_packet_time_ms_ = clock_->TimeInMilliseconds();
274
275 uint32_t ts_delta = 0;
276 int64_t t_delta = 0;
277 int size_delta = 0;
278 // For now only try to detect probes while we don't have a valid estimate, and
279 // make sure the packet was paced. We currently assume that only packets
280 // larger than 200 bytes are paced by the sender.
281 was_paced = was_paced && payload_size > PacedSender::kMinProbePacketSize;
282 if (was_paced &&
283 (!remote_rate_.ValidEstimate() ||
284 now_ms - first_packet_time_ms_ < kInitialProbingIntervalMs)) {
285 // TODO(holmer): Use a map instead to get correct order?
286 if (total_probes_received_ < kMaxProbePackets) {
287 int send_delta_ms = -1;
288 int recv_delta_ms = -1;
289 if (!probes_.empty()) {
290 send_delta_ms = send_time_ms - probes_.back().send_time_ms;
291 recv_delta_ms = arrival_time_ms - probes_.back().recv_time_ms;
292 }
293 LOG(LS_INFO) << "Probe packet received: send time=" << send_time_ms
294 << " ms, recv time=" << arrival_time_ms
295 << " ms, send delta=" << send_delta_ms
296 << " ms, recv delta=" << recv_delta_ms << " ms.";
297 }
298 probes_.push_back(Probe(send_time_ms, arrival_time_ms, payload_size));
299 ++total_probes_received_;
300 ProcessClusters(now_ms);
301 }
302 if (!inter_arrival_.get()) {
303 inter_arrival_.reset(
304 new InterArrival((kTimestampGroupLengthMs << kInterArrivalShift) / 1000,
305 kTimestampToMs, true));
306 }
307 if (inter_arrival_->ComputeDeltas(timestamp, arrival_time_ms, payload_size,
308 &ts_delta, &t_delta, &size_delta)) {
309 double ts_delta_ms = (1000.0 * ts_delta) / (1 << kInterArrivalShift);
310 estimator_.Update(t_delta, ts_delta_ms, size_delta, detector_.State());
311 detector_.Detect(estimator_.offset(), ts_delta_ms,
312 estimator_.num_of_deltas(), arrival_time_ms);
313 UpdateStats(static_cast<int>(t_delta - ts_delta_ms), now_ms);
314 }
315 if (detector_.State() == kBwOverusing) {
316 uint32_t incoming_bitrate_bps = incoming_bitrate_.Rate(now_ms);
317 if (prior_state != kBwOverusing ||
318 remote_rate_.TimeToReduceFurther(now_ms, incoming_bitrate_bps)) {
319 // The first overuse should immediately trigger a new estimate.
320 // We also have to update the estimate immediately if we are overusing
321 // and the target bitrate is too high compared to what we are receiving.
322 UpdateEstimate(now_ms);
323 }
324 }
325 }
326
Process()327 int32_t RemoteBitrateEstimatorAbsSendTime::Process() {
328 if (TimeUntilNextProcess() > 0) {
329 return 0;
330 }
331 {
332 CriticalSectionScoped cs(crit_sect_.get());
333 UpdateEstimate(clock_->TimeInMilliseconds());
334 }
335 last_process_time_ = clock_->TimeInMilliseconds();
336 return 0;
337 }
338
TimeUntilNextProcess()339 int64_t RemoteBitrateEstimatorAbsSendTime::TimeUntilNextProcess() {
340 if (last_process_time_ < 0) {
341 return 0;
342 }
343 {
344 CriticalSectionScoped cs(crit_sect_.get());
345 return last_process_time_ + process_interval_ms_ -
346 clock_->TimeInMilliseconds();
347 }
348 }
349
UpdateEstimate(int64_t now_ms)350 void RemoteBitrateEstimatorAbsSendTime::UpdateEstimate(int64_t now_ms) {
351 if (!inter_arrival_.get()) {
352 // No packets have been received on the active streams.
353 return;
354 }
355 for (Ssrcs::iterator it = ssrcs_.begin(); it != ssrcs_.end();) {
356 if ((now_ms - it->second) > kStreamTimeOutMs) {
357 ssrcs_.erase(it++);
358 } else {
359 ++it;
360 }
361 }
362 if (ssrcs_.empty()) {
363 // We can't update the estimate if we don't have any active streams.
364 inter_arrival_.reset();
365 // We deliberately don't reset the first_packet_time_ms_ here for now since
366 // we only probe for bandwidth in the beginning of a call right now.
367 return;
368 }
369
370 const RateControlInput input(detector_.State(),
371 incoming_bitrate_.Rate(now_ms),
372 estimator_.var_noise());
373 remote_rate_.Update(&input, now_ms);
374 unsigned int target_bitrate = remote_rate_.UpdateBandwidthEstimate(now_ms);
375 if (remote_rate_.ValidEstimate()) {
376 process_interval_ms_ = remote_rate_.GetFeedbackInterval();
377 observer_->OnReceiveBitrateChanged(Keys(ssrcs_), target_bitrate);
378 }
379 }
380
OnRttUpdate(int64_t avg_rtt_ms,int64_t max_rtt_ms)381 void RemoteBitrateEstimatorAbsSendTime::OnRttUpdate(int64_t avg_rtt_ms,
382 int64_t max_rtt_ms) {
383 CriticalSectionScoped cs(crit_sect_.get());
384 remote_rate_.SetRtt(avg_rtt_ms);
385 }
386
RemoveStream(unsigned int ssrc)387 void RemoteBitrateEstimatorAbsSendTime::RemoveStream(unsigned int ssrc) {
388 CriticalSectionScoped cs(crit_sect_.get());
389 ssrcs_.erase(ssrc);
390 }
391
LatestEstimate(std::vector<unsigned int> * ssrcs,unsigned int * bitrate_bps) const392 bool RemoteBitrateEstimatorAbsSendTime::LatestEstimate(
393 std::vector<unsigned int>* ssrcs,
394 unsigned int* bitrate_bps) const {
395 CriticalSectionScoped cs(crit_sect_.get());
396 assert(ssrcs);
397 assert(bitrate_bps);
398 if (!remote_rate_.ValidEstimate()) {
399 return false;
400 }
401 *ssrcs = Keys(ssrcs_);
402 if (ssrcs_.empty()) {
403 *bitrate_bps = 0;
404 } else {
405 *bitrate_bps = remote_rate_.LatestEstimate();
406 }
407 return true;
408 }
409
GetStats(ReceiveBandwidthEstimatorStats * output) const410 bool RemoteBitrateEstimatorAbsSendTime::GetStats(
411 ReceiveBandwidthEstimatorStats* output) const {
412 {
413 CriticalSectionScoped cs(crit_sect_.get());
414 output->recent_propagation_time_delta_ms = recent_propagation_delta_ms_;
415 output->recent_arrival_time_ms = recent_update_time_ms_;
416 output->total_propagation_time_delta_ms = total_propagation_delta_ms_;
417 }
418 RemoveStaleEntries(
419 &output->recent_arrival_time_ms,
420 &output->recent_propagation_time_delta_ms,
421 clock_->TimeInMilliseconds() - kPropagationDeltaQueueMaxTimeMs);
422 return true;
423 }
424
UpdateStats(int propagation_delta_ms,int64_t now_ms)425 void RemoteBitrateEstimatorAbsSendTime::UpdateStats(int propagation_delta_ms,
426 int64_t now_ms) {
427 // The caller must enter crit_sect_ before the call.
428
429 // Remove the oldest entry if the size limit is reached.
430 if (recent_update_time_ms_.size() == kPropagationDeltaQueueMaxSize) {
431 recent_update_time_ms_.erase(recent_update_time_ms_.begin());
432 recent_propagation_delta_ms_.erase(recent_propagation_delta_ms_.begin());
433 }
434
435 recent_propagation_delta_ms_.push_back(propagation_delta_ms);
436 recent_update_time_ms_.push_back(now_ms);
437
438 RemoveStaleEntries(
439 &recent_update_time_ms_,
440 &recent_propagation_delta_ms_,
441 now_ms - kPropagationDeltaQueueMaxTimeMs);
442
443 total_propagation_delta_ms_ =
444 std::max(total_propagation_delta_ms_ + propagation_delta_ms, 0);
445 }
446
SetMinBitrate(int min_bitrate_bps)447 void RemoteBitrateEstimatorAbsSendTime::SetMinBitrate(int min_bitrate_bps) {
448 CriticalSectionScoped cs(crit_sect_.get());
449 remote_rate_.SetMinBitrate(min_bitrate_bps);
450 }
451 } // namespace webrtc
452