1 /*
2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/rtp_rtcp/source/receive_statistics_impl.h"
12
13 #include <cmath>
14 #include <cstdlib>
15 #include <memory>
16 #include <vector>
17
18 #include "modules/remote_bitrate_estimator/test/bwe_test_logging.h"
19 #include "modules/rtp_rtcp/source/rtp_packet_received.h"
20 #include "modules/rtp_rtcp/source/rtp_rtcp_config.h"
21 #include "modules/rtp_rtcp/source/time_util.h"
22 #include "rtc_base/logging.h"
23 #include "system_wrappers/include/clock.h"
24
25 namespace webrtc {
26
27 const int64_t kStatisticsTimeoutMs = 8000;
28 const int64_t kStatisticsProcessIntervalMs = 1000;
29
~StreamStatistician()30 StreamStatistician::~StreamStatistician() {}
31
StreamStatisticianImpl(uint32_t ssrc,Clock * clock,int max_reordering_threshold)32 StreamStatisticianImpl::StreamStatisticianImpl(uint32_t ssrc,
33 Clock* clock,
34 int max_reordering_threshold)
35 : ssrc_(ssrc),
36 clock_(clock),
37 incoming_bitrate_(kStatisticsProcessIntervalMs,
38 RateStatistics::kBpsScale),
39 max_reordering_threshold_(max_reordering_threshold),
40 enable_retransmit_detection_(false),
41 jitter_q4_(0),
42 cumulative_loss_(0),
43 cumulative_loss_rtcp_offset_(0),
44 last_receive_time_ms_(0),
45 last_received_timestamp_(0),
46 received_seq_first_(-1),
47 received_seq_max_(-1),
48 last_report_cumulative_loss_(0),
49 last_report_seq_max_(-1) {}
50
51 StreamStatisticianImpl::~StreamStatisticianImpl() = default;
52
UpdateOutOfOrder(const RtpPacketReceived & packet,int64_t sequence_number,int64_t now_ms)53 bool StreamStatisticianImpl::UpdateOutOfOrder(const RtpPacketReceived& packet,
54 int64_t sequence_number,
55 int64_t now_ms) {
56 // Check if |packet| is second packet of a stream restart.
57 if (received_seq_out_of_order_) {
58 // Count the previous packet as a received; it was postponed below.
59 --cumulative_loss_;
60
61 uint16_t expected_sequence_number = *received_seq_out_of_order_ + 1;
62 received_seq_out_of_order_ = absl::nullopt;
63 if (packet.SequenceNumber() == expected_sequence_number) {
64 // Ignore sequence number gap caused by stream restart for packet loss
65 // calculation, by setting received_seq_max_ to the sequence number just
66 // before the out-of-order seqno. This gives a net zero change of
67 // |cumulative_loss_|, for the two packets interpreted as a stream reset.
68 //
69 // Fraction loss for the next report may get a bit off, since we don't
70 // update last_report_seq_max_ and last_report_cumulative_loss_ in a
71 // consistent way.
72 last_report_seq_max_ = sequence_number - 2;
73 received_seq_max_ = sequence_number - 2;
74 return false;
75 }
76 }
77
78 if (std::abs(sequence_number - received_seq_max_) >
79 max_reordering_threshold_) {
80 // Sequence number gap looks too large, wait until next packet to check
81 // for a stream restart.
82 received_seq_out_of_order_ = packet.SequenceNumber();
83 // Postpone counting this as a received packet until we know how to update
84 // |received_seq_max_|, otherwise we temporarily decrement
85 // |cumulative_loss_|. The
86 // ReceiveStatisticsTest.StreamRestartDoesntCountAsLoss test expects
87 // |cumulative_loss_| to be unchanged by the reception of the first packet
88 // after stream reset.
89 ++cumulative_loss_;
90 return true;
91 }
92
93 if (sequence_number > received_seq_max_)
94 return false;
95
96 // Old out of order packet, may be retransmit.
97 if (enable_retransmit_detection_ && IsRetransmitOfOldPacket(packet, now_ms))
98 receive_counters_.retransmitted.AddPacket(packet);
99 return true;
100 }
101
UpdateCounters(const RtpPacketReceived & packet)102 void StreamStatisticianImpl::UpdateCounters(const RtpPacketReceived& packet) {
103 MutexLock lock(&stream_lock_);
104 RTC_DCHECK_EQ(ssrc_, packet.Ssrc());
105 int64_t now_ms = clock_->TimeInMilliseconds();
106
107 incoming_bitrate_.Update(packet.size(), now_ms);
108 receive_counters_.last_packet_received_timestamp_ms = now_ms;
109 receive_counters_.transmitted.AddPacket(packet);
110 --cumulative_loss_;
111
112 int64_t sequence_number =
113 seq_unwrapper_.UnwrapWithoutUpdate(packet.SequenceNumber());
114
115 if (!ReceivedRtpPacket()) {
116 received_seq_first_ = sequence_number;
117 last_report_seq_max_ = sequence_number - 1;
118 received_seq_max_ = sequence_number - 1;
119 receive_counters_.first_packet_time_ms = now_ms;
120 } else if (UpdateOutOfOrder(packet, sequence_number, now_ms)) {
121 return;
122 }
123 // In order packet.
124 cumulative_loss_ += sequence_number - received_seq_max_;
125 received_seq_max_ = sequence_number;
126 seq_unwrapper_.UpdateLast(sequence_number);
127
128 // If new time stamp and more than one in-order packet received, calculate
129 // new jitter statistics.
130 if (packet.Timestamp() != last_received_timestamp_ &&
131 (receive_counters_.transmitted.packets -
132 receive_counters_.retransmitted.packets) > 1) {
133 UpdateJitter(packet, now_ms);
134 }
135 last_received_timestamp_ = packet.Timestamp();
136 last_receive_time_ms_ = now_ms;
137 }
138
UpdateJitter(const RtpPacketReceived & packet,int64_t receive_time_ms)139 void StreamStatisticianImpl::UpdateJitter(const RtpPacketReceived& packet,
140 int64_t receive_time_ms) {
141 int64_t receive_diff_ms = receive_time_ms - last_receive_time_ms_;
142 RTC_DCHECK_GE(receive_diff_ms, 0);
143 uint32_t receive_diff_rtp = static_cast<uint32_t>(
144 (receive_diff_ms * packet.payload_type_frequency()) / 1000);
145 int32_t time_diff_samples =
146 receive_diff_rtp - (packet.Timestamp() - last_received_timestamp_);
147
148 time_diff_samples = std::abs(time_diff_samples);
149
150 // lib_jingle sometimes deliver crazy jumps in TS for the same stream.
151 // If this happens, don't update jitter value. Use 5 secs video frequency
152 // as the threshold.
153 if (time_diff_samples < 450000) {
154 // Note we calculate in Q4 to avoid using float.
155 int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
156 jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
157 }
158 }
159
SetMaxReorderingThreshold(int max_reordering_threshold)160 void StreamStatisticianImpl::SetMaxReorderingThreshold(
161 int max_reordering_threshold) {
162 MutexLock lock(&stream_lock_);
163 max_reordering_threshold_ = max_reordering_threshold;
164 }
165
EnableRetransmitDetection(bool enable)166 void StreamStatisticianImpl::EnableRetransmitDetection(bool enable) {
167 MutexLock lock(&stream_lock_);
168 enable_retransmit_detection_ = enable;
169 }
170
GetStats() const171 RtpReceiveStats StreamStatisticianImpl::GetStats() const {
172 MutexLock lock(&stream_lock_);
173 RtpReceiveStats stats;
174 stats.packets_lost = cumulative_loss_;
175 // TODO(nisse): Can we return a float instead?
176 // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
177 stats.jitter = jitter_q4_ >> 4;
178 stats.last_packet_received_timestamp_ms =
179 receive_counters_.last_packet_received_timestamp_ms;
180 stats.packet_counter = receive_counters_.transmitted;
181 return stats;
182 }
183
GetActiveStatisticsAndReset(RtcpStatistics * statistics)184 bool StreamStatisticianImpl::GetActiveStatisticsAndReset(
185 RtcpStatistics* statistics) {
186 MutexLock lock(&stream_lock_);
187 if (clock_->TimeInMilliseconds() - last_receive_time_ms_ >=
188 kStatisticsTimeoutMs) {
189 // Not active.
190 return false;
191 }
192 if (!ReceivedRtpPacket()) {
193 return false;
194 }
195
196 *statistics = CalculateRtcpStatistics();
197
198 return true;
199 }
200
CalculateRtcpStatistics()201 RtcpStatistics StreamStatisticianImpl::CalculateRtcpStatistics() {
202 RtcpStatistics stats;
203 // Calculate fraction lost.
204 int64_t exp_since_last = received_seq_max_ - last_report_seq_max_;
205 RTC_DCHECK_GE(exp_since_last, 0);
206
207 int32_t lost_since_last = cumulative_loss_ - last_report_cumulative_loss_;
208 if (exp_since_last > 0 && lost_since_last > 0) {
209 // Scale 0 to 255, where 255 is 100% loss.
210 stats.fraction_lost =
211 static_cast<uint8_t>(255 * lost_since_last / exp_since_last);
212 } else {
213 stats.fraction_lost = 0;
214 }
215
216 // TODO(danilchap): Ensure |stats.packets_lost| is clamped to fit in a signed
217 // 24-bit value.
218 stats.packets_lost = cumulative_loss_ + cumulative_loss_rtcp_offset_;
219 if (stats.packets_lost < 0) {
220 // Clamp to zero. Work around to accomodate for senders that misbehave with
221 // negative cumulative loss.
222 stats.packets_lost = 0;
223 cumulative_loss_rtcp_offset_ = -cumulative_loss_;
224 }
225 stats.extended_highest_sequence_number =
226 static_cast<uint32_t>(received_seq_max_);
227 // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
228 stats.jitter = jitter_q4_ >> 4;
229
230 // Only for report blocks in RTCP SR and RR.
231 last_report_cumulative_loss_ = cumulative_loss_;
232 last_report_seq_max_ = received_seq_max_;
233 BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "cumulative_loss_pkts",
234 clock_->TimeInMilliseconds(),
235 cumulative_loss_, ssrc_);
236 BWE_TEST_LOGGING_PLOT_WITH_SSRC(
237 1, "received_seq_max_pkts", clock_->TimeInMilliseconds(),
238 (received_seq_max_ - received_seq_first_), ssrc_);
239
240 return stats;
241 }
242
GetFractionLostInPercent() const243 absl::optional<int> StreamStatisticianImpl::GetFractionLostInPercent() const {
244 MutexLock lock(&stream_lock_);
245 if (!ReceivedRtpPacket()) {
246 return absl::nullopt;
247 }
248 int64_t expected_packets = 1 + received_seq_max_ - received_seq_first_;
249 if (expected_packets <= 0) {
250 return absl::nullopt;
251 }
252 if (cumulative_loss_ <= 0) {
253 return 0;
254 }
255 return 100 * static_cast<int64_t>(cumulative_loss_) / expected_packets;
256 }
257
GetReceiveStreamDataCounters() const258 StreamDataCounters StreamStatisticianImpl::GetReceiveStreamDataCounters()
259 const {
260 MutexLock lock(&stream_lock_);
261 return receive_counters_;
262 }
263
BitrateReceived() const264 uint32_t StreamStatisticianImpl::BitrateReceived() const {
265 MutexLock lock(&stream_lock_);
266 return incoming_bitrate_.Rate(clock_->TimeInMilliseconds()).value_or(0);
267 }
268
IsRetransmitOfOldPacket(const RtpPacketReceived & packet,int64_t now_ms) const269 bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
270 const RtpPacketReceived& packet,
271 int64_t now_ms) const {
272 uint32_t frequency_khz = packet.payload_type_frequency() / 1000;
273 RTC_DCHECK_GT(frequency_khz, 0);
274
275 int64_t time_diff_ms = now_ms - last_receive_time_ms_;
276
277 // Diff in time stamp since last received in order.
278 uint32_t timestamp_diff = packet.Timestamp() - last_received_timestamp_;
279 uint32_t rtp_time_stamp_diff_ms = timestamp_diff / frequency_khz;
280
281 int64_t max_delay_ms = 0;
282
283 // Jitter standard deviation in samples.
284 float jitter_std = std::sqrt(static_cast<float>(jitter_q4_ >> 4));
285
286 // 2 times the standard deviation => 95% confidence.
287 // And transform to milliseconds by dividing by the frequency in kHz.
288 max_delay_ms = static_cast<int64_t>((2 * jitter_std) / frequency_khz);
289
290 // Min max_delay_ms is 1.
291 if (max_delay_ms == 0) {
292 max_delay_ms = 1;
293 }
294 return time_diff_ms > rtp_time_stamp_diff_ms + max_delay_ms;
295 }
296
Create(Clock * clock)297 std::unique_ptr<ReceiveStatistics> ReceiveStatistics::Create(Clock* clock) {
298 return std::make_unique<ReceiveStatisticsImpl>(clock);
299 }
300
ReceiveStatisticsImpl(Clock * clock)301 ReceiveStatisticsImpl::ReceiveStatisticsImpl(Clock* clock)
302 : clock_(clock),
303 last_returned_ssrc_(0),
304 max_reordering_threshold_(kDefaultMaxReorderingThreshold) {}
305
~ReceiveStatisticsImpl()306 ReceiveStatisticsImpl::~ReceiveStatisticsImpl() {
307 while (!statisticians_.empty()) {
308 delete statisticians_.begin()->second;
309 statisticians_.erase(statisticians_.begin());
310 }
311 }
312
OnRtpPacket(const RtpPacketReceived & packet)313 void ReceiveStatisticsImpl::OnRtpPacket(const RtpPacketReceived& packet) {
314 // StreamStatisticianImpl instance is created once and only destroyed when
315 // this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has
316 // it's own locking so don't hold receive_statistics_lock_ (potential
317 // deadlock).
318 GetOrCreateStatistician(packet.Ssrc())->UpdateCounters(packet);
319 }
320
GetStatistician(uint32_t ssrc) const321 StreamStatisticianImpl* ReceiveStatisticsImpl::GetStatistician(
322 uint32_t ssrc) const {
323 MutexLock lock(&receive_statistics_lock_);
324 const auto& it = statisticians_.find(ssrc);
325 if (it == statisticians_.end())
326 return NULL;
327 return it->second;
328 }
329
GetOrCreateStatistician(uint32_t ssrc)330 StreamStatisticianImpl* ReceiveStatisticsImpl::GetOrCreateStatistician(
331 uint32_t ssrc) {
332 MutexLock lock(&receive_statistics_lock_);
333 StreamStatisticianImpl*& impl = statisticians_[ssrc];
334 if (impl == nullptr) { // new element
335 impl = new StreamStatisticianImpl(ssrc, clock_, max_reordering_threshold_);
336 }
337 return impl;
338 }
339
SetMaxReorderingThreshold(int max_reordering_threshold)340 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
341 int max_reordering_threshold) {
342 std::map<uint32_t, StreamStatisticianImpl*> statisticians;
343 {
344 MutexLock lock(&receive_statistics_lock_);
345 max_reordering_threshold_ = max_reordering_threshold;
346 statisticians = statisticians_;
347 }
348 for (auto& statistician : statisticians) {
349 statistician.second->SetMaxReorderingThreshold(max_reordering_threshold);
350 }
351 }
352
SetMaxReorderingThreshold(uint32_t ssrc,int max_reordering_threshold)353 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
354 uint32_t ssrc,
355 int max_reordering_threshold) {
356 GetOrCreateStatistician(ssrc)->SetMaxReorderingThreshold(
357 max_reordering_threshold);
358 }
359
EnableRetransmitDetection(uint32_t ssrc,bool enable)360 void ReceiveStatisticsImpl::EnableRetransmitDetection(uint32_t ssrc,
361 bool enable) {
362 GetOrCreateStatistician(ssrc)->EnableRetransmitDetection(enable);
363 }
364
RtcpReportBlocks(size_t max_blocks)365 std::vector<rtcp::ReportBlock> ReceiveStatisticsImpl::RtcpReportBlocks(
366 size_t max_blocks) {
367 std::map<uint32_t, StreamStatisticianImpl*> statisticians;
368 {
369 MutexLock lock(&receive_statistics_lock_);
370 statisticians = statisticians_;
371 }
372 std::vector<rtcp::ReportBlock> result;
373 result.reserve(std::min(max_blocks, statisticians.size()));
374 auto add_report_block = [&result](uint32_t media_ssrc,
375 StreamStatisticianImpl* statistician) {
376 // Do we have receive statistics to send?
377 RtcpStatistics stats;
378 if (!statistician->GetActiveStatisticsAndReset(&stats))
379 return;
380 result.emplace_back();
381 rtcp::ReportBlock& block = result.back();
382 block.SetMediaSsrc(media_ssrc);
383 block.SetFractionLost(stats.fraction_lost);
384 if (!block.SetCumulativeLost(stats.packets_lost)) {
385 RTC_LOG(LS_WARNING) << "Cumulative lost is oversized.";
386 result.pop_back();
387 return;
388 }
389 block.SetExtHighestSeqNum(stats.extended_highest_sequence_number);
390 block.SetJitter(stats.jitter);
391 };
392
393 const auto start_it = statisticians.upper_bound(last_returned_ssrc_);
394 for (auto it = start_it;
395 result.size() < max_blocks && it != statisticians.end(); ++it)
396 add_report_block(it->first, it->second);
397 for (auto it = statisticians.begin();
398 result.size() < max_blocks && it != start_it; ++it)
399 add_report_block(it->first, it->second);
400
401 if (!result.empty())
402 last_returned_ssrc_ = result.back().source_ssrc();
403 return result;
404 }
405
406 } // namespace webrtc
407