1 /*
2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/rtp_rtcp/source/receive_statistics_impl.h"
12
13 #include <cmath>
14 #include <cstdlib>
15 #include <memory>
16 #include <utility>
17 #include <vector>
18
19 #include "api/units/time_delta.h"
20 #include "modules/remote_bitrate_estimator/test/bwe_test_logging.h"
21 #include "modules/rtp_rtcp/source/rtcp_packet/report_block.h"
22 #include "modules/rtp_rtcp/source/rtp_packet_received.h"
23 #include "modules/rtp_rtcp/source/rtp_rtcp_config.h"
24 #include "modules/rtp_rtcp/source/time_util.h"
25 #include "rtc_base/logging.h"
26 #include "rtc_base/time_utils.h"
27 #include "system_wrappers/include/clock.h"
28
29 namespace webrtc {
30 namespace {
31 constexpr int64_t kStatisticsTimeoutMs = 8000;
32 constexpr int64_t kStatisticsProcessIntervalMs = 1000;
33 } // namespace
34
~StreamStatistician()35 StreamStatistician::~StreamStatistician() {}
36
StreamStatisticianImpl(uint32_t ssrc,Clock * clock,int max_reordering_threshold)37 StreamStatisticianImpl::StreamStatisticianImpl(uint32_t ssrc, Clock* clock,
38 int max_reordering_threshold)
39 : ssrc_(ssrc),
40 clock_(clock),
41 delta_internal_unix_epoch_ms_(clock_->CurrentNtpInMilliseconds() -
42 clock_->TimeInMilliseconds() -
43 rtc::kNtpJan1970Millisecs),
44 incoming_bitrate_(kStatisticsProcessIntervalMs,
45 RateStatistics::kBpsScale),
46 max_reordering_threshold_(max_reordering_threshold),
47 enable_retransmit_detection_(false),
48 cumulative_loss_is_capped_(false),
49 jitter_q4_(0),
50 cumulative_loss_(0),
51 cumulative_loss_rtcp_offset_(0),
52 last_receive_time_ms_(0),
53 last_received_timestamp_(0),
54 received_seq_first_(-1),
55 received_seq_max_(-1),
56 last_report_cumulative_loss_(0),
57 last_report_seq_max_(-1),
58 last_payload_type_frequency_(0) {}
59
60 StreamStatisticianImpl::~StreamStatisticianImpl() = default;
61
UpdateOutOfOrder(const RtpPacketReceived & packet,int64_t sequence_number,int64_t now_ms)62 bool StreamStatisticianImpl::UpdateOutOfOrder(const RtpPacketReceived& packet,
63 int64_t sequence_number,
64 int64_t now_ms) {
65 // Check if `packet` is second packet of a stream restart.
66 if (received_seq_out_of_order_) {
67 // Count the previous packet as a received; it was postponed below.
68 --cumulative_loss_;
69
70 uint16_t expected_sequence_number = *received_seq_out_of_order_ + 1;
71 received_seq_out_of_order_ = absl::nullopt;
72 if (packet.SequenceNumber() == expected_sequence_number) {
73 // Ignore sequence number gap caused by stream restart for packet loss
74 // calculation, by setting received_seq_max_ to the sequence number just
75 // before the out-of-order seqno. This gives a net zero change of
76 // `cumulative_loss_`, for the two packets interpreted as a stream reset.
77 //
78 // Fraction loss for the next report may get a bit off, since we don't
79 // update last_report_seq_max_ and last_report_cumulative_loss_ in a
80 // consistent way.
81 last_report_seq_max_ = sequence_number - 2;
82 received_seq_max_ = sequence_number - 2;
83 return false;
84 }
85 }
86
87 if (std::abs(sequence_number - received_seq_max_) >
88 max_reordering_threshold_) {
89 // Sequence number gap looks too large, wait until next packet to check
90 // for a stream restart.
91 received_seq_out_of_order_ = packet.SequenceNumber();
92 // Postpone counting this as a received packet until we know how to update
93 // `received_seq_max_`, otherwise we temporarily decrement
94 // `cumulative_loss_`. The
95 // ReceiveStatisticsTest.StreamRestartDoesntCountAsLoss test expects
96 // `cumulative_loss_` to be unchanged by the reception of the first packet
97 // after stream reset.
98 ++cumulative_loss_;
99 return true;
100 }
101
102 if (sequence_number > received_seq_max_)
103 return false;
104
105 // Old out of order packet, may be retransmit.
106 if (enable_retransmit_detection_ && IsRetransmitOfOldPacket(packet, now_ms))
107 receive_counters_.retransmitted.AddPacket(packet);
108 return true;
109 }
110
UpdateCounters(const RtpPacketReceived & packet)111 void StreamStatisticianImpl::UpdateCounters(const RtpPacketReceived& packet) {
112 RTC_DCHECK_EQ(ssrc_, packet.Ssrc());
113 int64_t now_ms = clock_->TimeInMilliseconds();
114
115 incoming_bitrate_.Update(packet.size(), now_ms);
116 receive_counters_.last_packet_received_timestamp_ms = now_ms;
117 receive_counters_.transmitted.AddPacket(packet);
118 --cumulative_loss_;
119
120 int64_t sequence_number =
121 seq_unwrapper_.UnwrapWithoutUpdate(packet.SequenceNumber());
122
123 if (!ReceivedRtpPacket()) {
124 received_seq_first_ = sequence_number;
125 last_report_seq_max_ = sequence_number - 1;
126 received_seq_max_ = sequence_number - 1;
127 receive_counters_.first_packet_time_ms = now_ms;
128 } else if (UpdateOutOfOrder(packet, sequence_number, now_ms)) {
129 return;
130 }
131 // In order packet.
132 cumulative_loss_ += sequence_number - received_seq_max_;
133 received_seq_max_ = sequence_number;
134 seq_unwrapper_.UpdateLast(sequence_number);
135
136 // If new time stamp and more than one in-order packet received, calculate
137 // new jitter statistics.
138 if (packet.Timestamp() != last_received_timestamp_ &&
139 (receive_counters_.transmitted.packets -
140 receive_counters_.retransmitted.packets) > 1) {
141 UpdateJitter(packet, now_ms);
142 }
143 last_received_timestamp_ = packet.Timestamp();
144 last_receive_time_ms_ = now_ms;
145 }
146
UpdateJitter(const RtpPacketReceived & packet,int64_t receive_time_ms)147 void StreamStatisticianImpl::UpdateJitter(const RtpPacketReceived& packet,
148 int64_t receive_time_ms) {
149 int64_t receive_diff_ms = receive_time_ms - last_receive_time_ms_;
150 RTC_DCHECK_GE(receive_diff_ms, 0);
151 uint32_t receive_diff_rtp = static_cast<uint32_t>(
152 (receive_diff_ms * packet.payload_type_frequency()) / 1000);
153 int32_t time_diff_samples =
154 receive_diff_rtp - (packet.Timestamp() - last_received_timestamp_);
155
156 time_diff_samples = std::abs(time_diff_samples);
157
158 ReviseFrequencyAndJitter(packet.payload_type_frequency());
159
160 // lib_jingle sometimes deliver crazy jumps in TS for the same stream.
161 // If this happens, don't update jitter value. Use 5 secs video frequency
162 // as the threshold.
163 if (time_diff_samples < 450000) {
164 // Note we calculate in Q4 to avoid using float.
165 int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
166 jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
167 }
168 }
169
ReviseFrequencyAndJitter(int payload_type_frequency)170 void StreamStatisticianImpl::ReviseFrequencyAndJitter(
171 int payload_type_frequency) {
172 if (payload_type_frequency == last_payload_type_frequency_) {
173 return;
174 }
175
176 if (payload_type_frequency != 0) {
177 if (last_payload_type_frequency_ != 0) {
178 // Value in "jitter_q4_" variable is a number of samples.
179 // I.e. jitter = timestamp (ms) * frequency (kHz).
180 // Since the frequency has changed we have to update the number of samples
181 // accordingly. The new value should rely on a new frequency.
182
183 // If we don't do such procedure we end up with the number of samples that
184 // cannot be converted into milliseconds correctly
185 // (i.e. jitter_ms = jitter_q4_ >> 4 / (payload_type_frequency / 1000)).
186 // In such case, the number of samples has a "mix".
187
188 // Doing so we pretend that everything prior and including the current
189 // packet were computed on packet's frequency.
190 jitter_q4_ = static_cast<int>(static_cast<uint64_t>(jitter_q4_) *
191 payload_type_frequency /
192 last_payload_type_frequency_);
193 }
194 // If last_payload_type_frequency_ is not present, the jitter_q4_
195 // variable has its initial value.
196
197 // Keep last_payload_type_frequency_ up to date and non-zero (set).
198 last_payload_type_frequency_ = payload_type_frequency;
199 }
200 }
201
SetMaxReorderingThreshold(int max_reordering_threshold)202 void StreamStatisticianImpl::SetMaxReorderingThreshold(
203 int max_reordering_threshold) {
204 max_reordering_threshold_ = max_reordering_threshold;
205 }
206
EnableRetransmitDetection(bool enable)207 void StreamStatisticianImpl::EnableRetransmitDetection(bool enable) {
208 enable_retransmit_detection_ = enable;
209 }
210
GetStats() const211 RtpReceiveStats StreamStatisticianImpl::GetStats() const {
212 RtpReceiveStats stats;
213 stats.packets_lost = cumulative_loss_;
214 // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
215 stats.jitter = jitter_q4_ >> 4;
216 if (last_payload_type_frequency_ > 0) {
217 // Divide value in fractional seconds by frequency to get jitter in
218 // fractional seconds.
219 stats.interarrival_jitter =
220 webrtc::TimeDelta::Seconds(stats.jitter) / last_payload_type_frequency_;
221 }
222 if (receive_counters_.last_packet_received_timestamp_ms.has_value()) {
223 stats.last_packet_received_timestamp_ms =
224 *receive_counters_.last_packet_received_timestamp_ms +
225 delta_internal_unix_epoch_ms_;
226 }
227 stats.packet_counter = receive_counters_.transmitted;
228 return stats;
229 }
230
MaybeAppendReportBlockAndReset(std::vector<rtcp::ReportBlock> & report_blocks)231 void StreamStatisticianImpl::MaybeAppendReportBlockAndReset(
232 std::vector<rtcp::ReportBlock>& report_blocks) {
233 int64_t now_ms = clock_->TimeInMilliseconds();
234 if (now_ms - last_receive_time_ms_ >= kStatisticsTimeoutMs) {
235 // Not active.
236 return;
237 }
238 if (!ReceivedRtpPacket()) {
239 return;
240 }
241
242 report_blocks.emplace_back();
243 rtcp::ReportBlock& stats = report_blocks.back();
244 stats.SetMediaSsrc(ssrc_);
245 // Calculate fraction lost.
246 int64_t exp_since_last = received_seq_max_ - last_report_seq_max_;
247 RTC_DCHECK_GE(exp_since_last, 0);
248
249 int32_t lost_since_last = cumulative_loss_ - last_report_cumulative_loss_;
250 if (exp_since_last > 0 && lost_since_last > 0) {
251 // Scale 0 to 255, where 255 is 100% loss.
252 stats.SetFractionLost(255 * lost_since_last / exp_since_last);
253 }
254
255 int packets_lost = cumulative_loss_ + cumulative_loss_rtcp_offset_;
256 if (packets_lost < 0) {
257 // Clamp to zero. Work around to accomodate for senders that misbehave with
258 // negative cumulative loss.
259 packets_lost = 0;
260 cumulative_loss_rtcp_offset_ = -cumulative_loss_;
261 }
262 if (packets_lost > 0x7fffff) {
263 // Packets lost is a 24 bit signed field, and thus should be clamped, as
264 // described in https://datatracker.ietf.org/doc/html/rfc3550#appendix-A.3
265 if (!cumulative_loss_is_capped_) {
266 cumulative_loss_is_capped_ = true;
267 RTC_LOG(LS_WARNING) << "Cumulative loss reached maximum value for ssrc "
268 << ssrc_;
269 }
270 packets_lost = 0x7fffff;
271 }
272 stats.SetCumulativeLost(packets_lost);
273 stats.SetExtHighestSeqNum(received_seq_max_);
274 // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
275 stats.SetJitter(jitter_q4_ >> 4);
276
277 // Only for report blocks in RTCP SR and RR.
278 last_report_cumulative_loss_ = cumulative_loss_;
279 last_report_seq_max_ = received_seq_max_;
280 BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "cumulative_loss_pkts", now_ms,
281 cumulative_loss_, ssrc_);
282 BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "received_seq_max_pkts", now_ms,
283 (received_seq_max_ - received_seq_first_),
284 ssrc_);
285 }
286
GetFractionLostInPercent() const287 absl::optional<int> StreamStatisticianImpl::GetFractionLostInPercent() const {
288 if (!ReceivedRtpPacket()) {
289 return absl::nullopt;
290 }
291 int64_t expected_packets = 1 + received_seq_max_ - received_seq_first_;
292 if (expected_packets <= 0) {
293 return absl::nullopt;
294 }
295 if (cumulative_loss_ <= 0) {
296 return 0;
297 }
298 return 100 * static_cast<int64_t>(cumulative_loss_) / expected_packets;
299 }
300
GetReceiveStreamDataCounters() const301 StreamDataCounters StreamStatisticianImpl::GetReceiveStreamDataCounters()
302 const {
303 return receive_counters_;
304 }
305
BitrateReceived() const306 uint32_t StreamStatisticianImpl::BitrateReceived() const {
307 return incoming_bitrate_.Rate(clock_->TimeInMilliseconds()).value_or(0);
308 }
309
IsRetransmitOfOldPacket(const RtpPacketReceived & packet,int64_t now_ms) const310 bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
311 const RtpPacketReceived& packet,
312 int64_t now_ms) const {
313 uint32_t frequency_khz = packet.payload_type_frequency() / 1000;
314 RTC_DCHECK_GT(frequency_khz, 0);
315
316 int64_t time_diff_ms = now_ms - last_receive_time_ms_;
317
318 // Diff in time stamp since last received in order.
319 uint32_t timestamp_diff = packet.Timestamp() - last_received_timestamp_;
320 uint32_t rtp_time_stamp_diff_ms = timestamp_diff / frequency_khz;
321
322 int64_t max_delay_ms = 0;
323
324 // Jitter standard deviation in samples.
325 float jitter_std = std::sqrt(static_cast<float>(jitter_q4_ >> 4));
326
327 // 2 times the standard deviation => 95% confidence.
328 // And transform to milliseconds by dividing by the frequency in kHz.
329 max_delay_ms = static_cast<int64_t>((2 * jitter_std) / frequency_khz);
330
331 // Min max_delay_ms is 1.
332 if (max_delay_ms == 0) {
333 max_delay_ms = 1;
334 }
335 return time_diff_ms > rtp_time_stamp_diff_ms + max_delay_ms;
336 }
337
Create(Clock * clock)338 std::unique_ptr<ReceiveStatistics> ReceiveStatistics::Create(Clock* clock) {
339 return std::make_unique<ReceiveStatisticsLocked>(
340 clock, [](uint32_t ssrc, Clock* clock, int max_reordering_threshold) {
341 return std::make_unique<StreamStatisticianLocked>(
342 ssrc, clock, max_reordering_threshold);
343 });
344 }
345
CreateThreadCompatible(Clock * clock)346 std::unique_ptr<ReceiveStatistics> ReceiveStatistics::CreateThreadCompatible(
347 Clock* clock) {
348 return std::make_unique<ReceiveStatisticsImpl>(
349 clock, [](uint32_t ssrc, Clock* clock, int max_reordering_threshold) {
350 return std::make_unique<StreamStatisticianImpl>(
351 ssrc, clock, max_reordering_threshold);
352 });
353 }
354
ReceiveStatisticsImpl(Clock * clock,std::function<std::unique_ptr<StreamStatisticianImplInterface> (uint32_t ssrc,Clock * clock,int max_reordering_threshold)> stream_statistician_factory)355 ReceiveStatisticsImpl::ReceiveStatisticsImpl(
356 Clock* clock,
357 std::function<std::unique_ptr<StreamStatisticianImplInterface>(
358 uint32_t ssrc,
359 Clock* clock,
360 int max_reordering_threshold)> stream_statistician_factory)
361 : clock_(clock),
362 stream_statistician_factory_(std::move(stream_statistician_factory)),
363 last_returned_ssrc_idx_(0),
364 max_reordering_threshold_(kDefaultMaxReorderingThreshold) {}
365
OnRtpPacket(const RtpPacketReceived & packet)366 void ReceiveStatisticsImpl::OnRtpPacket(const RtpPacketReceived& packet) {
367 // StreamStatisticianImpl instance is created once and only destroyed when
368 // this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has
369 // it's own locking so don't hold receive_statistics_lock_ (potential
370 // deadlock).
371 GetOrCreateStatistician(packet.Ssrc())->UpdateCounters(packet);
372 }
373
GetStatistician(uint32_t ssrc) const374 StreamStatistician* ReceiveStatisticsImpl::GetStatistician(
375 uint32_t ssrc) const {
376 const auto& it = statisticians_.find(ssrc);
377 if (it == statisticians_.end())
378 return nullptr;
379 return it->second.get();
380 }
381
GetOrCreateStatistician(uint32_t ssrc)382 StreamStatisticianImplInterface* ReceiveStatisticsImpl::GetOrCreateStatistician(
383 uint32_t ssrc) {
384 std::unique_ptr<StreamStatisticianImplInterface>& impl = statisticians_[ssrc];
385 if (impl == nullptr) { // new element
386 impl =
387 stream_statistician_factory_(ssrc, clock_, max_reordering_threshold_);
388 all_ssrcs_.push_back(ssrc);
389 }
390 return impl.get();
391 }
392
SetMaxReorderingThreshold(int max_reordering_threshold)393 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
394 int max_reordering_threshold) {
395 max_reordering_threshold_ = max_reordering_threshold;
396 for (auto& statistician : statisticians_) {
397 statistician.second->SetMaxReorderingThreshold(max_reordering_threshold);
398 }
399 }
400
SetMaxReorderingThreshold(uint32_t ssrc,int max_reordering_threshold)401 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
402 uint32_t ssrc,
403 int max_reordering_threshold) {
404 GetOrCreateStatistician(ssrc)->SetMaxReorderingThreshold(
405 max_reordering_threshold);
406 }
407
EnableRetransmitDetection(uint32_t ssrc,bool enable)408 void ReceiveStatisticsImpl::EnableRetransmitDetection(uint32_t ssrc,
409 bool enable) {
410 GetOrCreateStatistician(ssrc)->EnableRetransmitDetection(enable);
411 }
412
RtcpReportBlocks(size_t max_blocks)413 std::vector<rtcp::ReportBlock> ReceiveStatisticsImpl::RtcpReportBlocks(
414 size_t max_blocks) {
415 std::vector<rtcp::ReportBlock> result;
416 result.reserve(std::min(max_blocks, all_ssrcs_.size()));
417
418 size_t ssrc_idx = 0;
419 for (size_t i = 0; i < all_ssrcs_.size() && result.size() < max_blocks; ++i) {
420 ssrc_idx = (last_returned_ssrc_idx_ + i + 1) % all_ssrcs_.size();
421 const uint32_t media_ssrc = all_ssrcs_[ssrc_idx];
422 auto statistician_it = statisticians_.find(media_ssrc);
423 RTC_DCHECK(statistician_it != statisticians_.end());
424 statistician_it->second->MaybeAppendReportBlockAndReset(result);
425 }
426 last_returned_ssrc_idx_ = ssrc_idx;
427 return result;
428 }
429
430 } // namespace webrtc
431