• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/rtp_rtcp/source/receive_statistics_impl.h"
12 
13 #include <cmath>
14 #include <cstdlib>
15 #include <memory>
16 #include <utility>
17 #include <vector>
18 
19 #include "api/units/time_delta.h"
20 #include "modules/remote_bitrate_estimator/test/bwe_test_logging.h"
21 #include "modules/rtp_rtcp/source/rtcp_packet/report_block.h"
22 #include "modules/rtp_rtcp/source/rtp_packet_received.h"
23 #include "modules/rtp_rtcp/source/rtp_rtcp_config.h"
24 #include "modules/rtp_rtcp/source/time_util.h"
25 #include "rtc_base/logging.h"
26 #include "rtc_base/time_utils.h"
27 #include "system_wrappers/include/clock.h"
28 
29 namespace webrtc {
30 namespace {
31 constexpr int64_t kStatisticsTimeoutMs = 8000;
32 constexpr int64_t kStatisticsProcessIntervalMs = 1000;
33 }  // namespace
34 
~StreamStatistician()35 StreamStatistician::~StreamStatistician() {}
36 
StreamStatisticianImpl(uint32_t ssrc,Clock * clock,int max_reordering_threshold)37 StreamStatisticianImpl::StreamStatisticianImpl(uint32_t ssrc, Clock* clock,
38                                                int max_reordering_threshold)
39     : ssrc_(ssrc),
40       clock_(clock),
41       delta_internal_unix_epoch_ms_(clock_->CurrentNtpInMilliseconds() -
42                                     clock_->TimeInMilliseconds() -
43                                     rtc::kNtpJan1970Millisecs),
44       incoming_bitrate_(kStatisticsProcessIntervalMs,
45                         RateStatistics::kBpsScale),
46       max_reordering_threshold_(max_reordering_threshold),
47       enable_retransmit_detection_(false),
48       cumulative_loss_is_capped_(false),
49       jitter_q4_(0),
50       cumulative_loss_(0),
51       cumulative_loss_rtcp_offset_(0),
52       last_receive_time_ms_(0),
53       last_received_timestamp_(0),
54       received_seq_first_(-1),
55       received_seq_max_(-1),
56       last_report_cumulative_loss_(0),
57       last_report_seq_max_(-1),
58       last_payload_type_frequency_(0) {}
59 
60 StreamStatisticianImpl::~StreamStatisticianImpl() = default;
61 
UpdateOutOfOrder(const RtpPacketReceived & packet,int64_t sequence_number,int64_t now_ms)62 bool StreamStatisticianImpl::UpdateOutOfOrder(const RtpPacketReceived& packet,
63                                               int64_t sequence_number,
64                                               int64_t now_ms) {
65   // Check if `packet` is second packet of a stream restart.
66   if (received_seq_out_of_order_) {
67     // Count the previous packet as a received; it was postponed below.
68     --cumulative_loss_;
69 
70     uint16_t expected_sequence_number = *received_seq_out_of_order_ + 1;
71     received_seq_out_of_order_ = absl::nullopt;
72     if (packet.SequenceNumber() == expected_sequence_number) {
73       // Ignore sequence number gap caused by stream restart for packet loss
74       // calculation, by setting received_seq_max_ to the sequence number just
75       // before the out-of-order seqno. This gives a net zero change of
76       // `cumulative_loss_`, for the two packets interpreted as a stream reset.
77       //
78       // Fraction loss for the next report may get a bit off, since we don't
79       // update last_report_seq_max_ and last_report_cumulative_loss_ in a
80       // consistent way.
81       last_report_seq_max_ = sequence_number - 2;
82       received_seq_max_ = sequence_number - 2;
83       return false;
84     }
85   }
86 
87   if (std::abs(sequence_number - received_seq_max_) >
88       max_reordering_threshold_) {
89     // Sequence number gap looks too large, wait until next packet to check
90     // for a stream restart.
91     received_seq_out_of_order_ = packet.SequenceNumber();
92     // Postpone counting this as a received packet until we know how to update
93     // `received_seq_max_`, otherwise we temporarily decrement
94     // `cumulative_loss_`. The
95     // ReceiveStatisticsTest.StreamRestartDoesntCountAsLoss test expects
96     // `cumulative_loss_` to be unchanged by the reception of the first packet
97     // after stream reset.
98     ++cumulative_loss_;
99     return true;
100   }
101 
102   if (sequence_number > received_seq_max_)
103     return false;
104 
105   // Old out of order packet, may be retransmit.
106   if (enable_retransmit_detection_ && IsRetransmitOfOldPacket(packet, now_ms))
107     receive_counters_.retransmitted.AddPacket(packet);
108   return true;
109 }
110 
UpdateCounters(const RtpPacketReceived & packet)111 void StreamStatisticianImpl::UpdateCounters(const RtpPacketReceived& packet) {
112   RTC_DCHECK_EQ(ssrc_, packet.Ssrc());
113   int64_t now_ms = clock_->TimeInMilliseconds();
114 
115   incoming_bitrate_.Update(packet.size(), now_ms);
116   receive_counters_.last_packet_received_timestamp_ms = now_ms;
117   receive_counters_.transmitted.AddPacket(packet);
118   --cumulative_loss_;
119 
120   int64_t sequence_number =
121       seq_unwrapper_.UnwrapWithoutUpdate(packet.SequenceNumber());
122 
123   if (!ReceivedRtpPacket()) {
124     received_seq_first_ = sequence_number;
125     last_report_seq_max_ = sequence_number - 1;
126     received_seq_max_ = sequence_number - 1;
127     receive_counters_.first_packet_time_ms = now_ms;
128   } else if (UpdateOutOfOrder(packet, sequence_number, now_ms)) {
129     return;
130   }
131   // In order packet.
132   cumulative_loss_ += sequence_number - received_seq_max_;
133   received_seq_max_ = sequence_number;
134   seq_unwrapper_.UpdateLast(sequence_number);
135 
136   // If new time stamp and more than one in-order packet received, calculate
137   // new jitter statistics.
138   if (packet.Timestamp() != last_received_timestamp_ &&
139       (receive_counters_.transmitted.packets -
140        receive_counters_.retransmitted.packets) > 1) {
141     UpdateJitter(packet, now_ms);
142   }
143   last_received_timestamp_ = packet.Timestamp();
144   last_receive_time_ms_ = now_ms;
145 }
146 
UpdateJitter(const RtpPacketReceived & packet,int64_t receive_time_ms)147 void StreamStatisticianImpl::UpdateJitter(const RtpPacketReceived& packet,
148                                           int64_t receive_time_ms) {
149   int64_t receive_diff_ms = receive_time_ms - last_receive_time_ms_;
150   RTC_DCHECK_GE(receive_diff_ms, 0);
151   uint32_t receive_diff_rtp = static_cast<uint32_t>(
152       (receive_diff_ms * packet.payload_type_frequency()) / 1000);
153   int32_t time_diff_samples =
154       receive_diff_rtp - (packet.Timestamp() - last_received_timestamp_);
155 
156   time_diff_samples = std::abs(time_diff_samples);
157 
158   ReviseFrequencyAndJitter(packet.payload_type_frequency());
159 
160   // lib_jingle sometimes deliver crazy jumps in TS for the same stream.
161   // If this happens, don't update jitter value. Use 5 secs video frequency
162   // as the threshold.
163   if (time_diff_samples < 450000) {
164     // Note we calculate in Q4 to avoid using float.
165     int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
166     jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
167   }
168 }
169 
ReviseFrequencyAndJitter(int payload_type_frequency)170 void StreamStatisticianImpl::ReviseFrequencyAndJitter(
171     int payload_type_frequency) {
172   if (payload_type_frequency == last_payload_type_frequency_) {
173     return;
174   }
175 
176   if (payload_type_frequency != 0) {
177     if (last_payload_type_frequency_ != 0) {
178       // Value in "jitter_q4_" variable is a number of samples.
179       // I.e. jitter = timestamp (ms) * frequency (kHz).
180       // Since the frequency has changed we have to update the number of samples
181       // accordingly. The new value should rely on a new frequency.
182 
183       // If we don't do such procedure we end up with the number of samples that
184       // cannot be converted into milliseconds correctly
185       // (i.e. jitter_ms = jitter_q4_ >> 4 / (payload_type_frequency / 1000)).
186       // In such case, the number of samples has a "mix".
187 
188       // Doing so we pretend that everything prior and including the current
189       // packet were computed on packet's frequency.
190       jitter_q4_ = static_cast<int>(static_cast<uint64_t>(jitter_q4_) *
191                                     payload_type_frequency /
192                                     last_payload_type_frequency_);
193     }
194     // If last_payload_type_frequency_ is not present, the jitter_q4_
195     // variable has its initial value.
196 
197     // Keep last_payload_type_frequency_ up to date and non-zero (set).
198     last_payload_type_frequency_ = payload_type_frequency;
199   }
200 }
201 
SetMaxReorderingThreshold(int max_reordering_threshold)202 void StreamStatisticianImpl::SetMaxReorderingThreshold(
203     int max_reordering_threshold) {
204   max_reordering_threshold_ = max_reordering_threshold;
205 }
206 
EnableRetransmitDetection(bool enable)207 void StreamStatisticianImpl::EnableRetransmitDetection(bool enable) {
208   enable_retransmit_detection_ = enable;
209 }
210 
GetStats() const211 RtpReceiveStats StreamStatisticianImpl::GetStats() const {
212   RtpReceiveStats stats;
213   stats.packets_lost = cumulative_loss_;
214   // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
215   stats.jitter = jitter_q4_ >> 4;
216   if (last_payload_type_frequency_ > 0) {
217     // Divide value in fractional seconds by frequency to get jitter in
218     // fractional seconds.
219     stats.interarrival_jitter =
220         webrtc::TimeDelta::Seconds(stats.jitter) / last_payload_type_frequency_;
221   }
222   if (receive_counters_.last_packet_received_timestamp_ms.has_value()) {
223     stats.last_packet_received_timestamp_ms =
224         *receive_counters_.last_packet_received_timestamp_ms +
225         delta_internal_unix_epoch_ms_;
226   }
227   stats.packet_counter = receive_counters_.transmitted;
228   return stats;
229 }
230 
MaybeAppendReportBlockAndReset(std::vector<rtcp::ReportBlock> & report_blocks)231 void StreamStatisticianImpl::MaybeAppendReportBlockAndReset(
232     std::vector<rtcp::ReportBlock>& report_blocks) {
233   int64_t now_ms = clock_->TimeInMilliseconds();
234   if (now_ms - last_receive_time_ms_ >= kStatisticsTimeoutMs) {
235     // Not active.
236     return;
237   }
238   if (!ReceivedRtpPacket()) {
239     return;
240   }
241 
242   report_blocks.emplace_back();
243   rtcp::ReportBlock& stats = report_blocks.back();
244   stats.SetMediaSsrc(ssrc_);
245   // Calculate fraction lost.
246   int64_t exp_since_last = received_seq_max_ - last_report_seq_max_;
247   RTC_DCHECK_GE(exp_since_last, 0);
248 
249   int32_t lost_since_last = cumulative_loss_ - last_report_cumulative_loss_;
250   if (exp_since_last > 0 && lost_since_last > 0) {
251     // Scale 0 to 255, where 255 is 100% loss.
252     stats.SetFractionLost(255 * lost_since_last / exp_since_last);
253   }
254 
255   int packets_lost = cumulative_loss_ + cumulative_loss_rtcp_offset_;
256   if (packets_lost < 0) {
257     // Clamp to zero. Work around to accomodate for senders that misbehave with
258     // negative cumulative loss.
259     packets_lost = 0;
260     cumulative_loss_rtcp_offset_ = -cumulative_loss_;
261   }
262   if (packets_lost > 0x7fffff) {
263     // Packets lost is a 24 bit signed field, and thus should be clamped, as
264     // described in https://datatracker.ietf.org/doc/html/rfc3550#appendix-A.3
265     if (!cumulative_loss_is_capped_) {
266       cumulative_loss_is_capped_ = true;
267       RTC_LOG(LS_WARNING) << "Cumulative loss reached maximum value for ssrc "
268                           << ssrc_;
269     }
270     packets_lost = 0x7fffff;
271   }
272   stats.SetCumulativeLost(packets_lost);
273   stats.SetExtHighestSeqNum(received_seq_max_);
274   // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
275   stats.SetJitter(jitter_q4_ >> 4);
276 
277   // Only for report blocks in RTCP SR and RR.
278   last_report_cumulative_loss_ = cumulative_loss_;
279   last_report_seq_max_ = received_seq_max_;
280   BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "cumulative_loss_pkts", now_ms,
281                                   cumulative_loss_, ssrc_);
282   BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "received_seq_max_pkts", now_ms,
283                                   (received_seq_max_ - received_seq_first_),
284                                   ssrc_);
285 }
286 
GetFractionLostInPercent() const287 absl::optional<int> StreamStatisticianImpl::GetFractionLostInPercent() const {
288   if (!ReceivedRtpPacket()) {
289     return absl::nullopt;
290   }
291   int64_t expected_packets = 1 + received_seq_max_ - received_seq_first_;
292   if (expected_packets <= 0) {
293     return absl::nullopt;
294   }
295   if (cumulative_loss_ <= 0) {
296     return 0;
297   }
298   return 100 * static_cast<int64_t>(cumulative_loss_) / expected_packets;
299 }
300 
GetReceiveStreamDataCounters() const301 StreamDataCounters StreamStatisticianImpl::GetReceiveStreamDataCounters()
302     const {
303   return receive_counters_;
304 }
305 
BitrateReceived() const306 uint32_t StreamStatisticianImpl::BitrateReceived() const {
307   return incoming_bitrate_.Rate(clock_->TimeInMilliseconds()).value_or(0);
308 }
309 
IsRetransmitOfOldPacket(const RtpPacketReceived & packet,int64_t now_ms) const310 bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
311     const RtpPacketReceived& packet,
312     int64_t now_ms) const {
313   uint32_t frequency_khz = packet.payload_type_frequency() / 1000;
314   RTC_DCHECK_GT(frequency_khz, 0);
315 
316   int64_t time_diff_ms = now_ms - last_receive_time_ms_;
317 
318   // Diff in time stamp since last received in order.
319   uint32_t timestamp_diff = packet.Timestamp() - last_received_timestamp_;
320   uint32_t rtp_time_stamp_diff_ms = timestamp_diff / frequency_khz;
321 
322   int64_t max_delay_ms = 0;
323 
324   // Jitter standard deviation in samples.
325   float jitter_std = std::sqrt(static_cast<float>(jitter_q4_ >> 4));
326 
327   // 2 times the standard deviation => 95% confidence.
328   // And transform to milliseconds by dividing by the frequency in kHz.
329   max_delay_ms = static_cast<int64_t>((2 * jitter_std) / frequency_khz);
330 
331   // Min max_delay_ms is 1.
332   if (max_delay_ms == 0) {
333     max_delay_ms = 1;
334   }
335   return time_diff_ms > rtp_time_stamp_diff_ms + max_delay_ms;
336 }
337 
Create(Clock * clock)338 std::unique_ptr<ReceiveStatistics> ReceiveStatistics::Create(Clock* clock) {
339   return std::make_unique<ReceiveStatisticsLocked>(
340       clock, [](uint32_t ssrc, Clock* clock, int max_reordering_threshold) {
341         return std::make_unique<StreamStatisticianLocked>(
342             ssrc, clock, max_reordering_threshold);
343       });
344 }
345 
CreateThreadCompatible(Clock * clock)346 std::unique_ptr<ReceiveStatistics> ReceiveStatistics::CreateThreadCompatible(
347     Clock* clock) {
348   return std::make_unique<ReceiveStatisticsImpl>(
349       clock, [](uint32_t ssrc, Clock* clock, int max_reordering_threshold) {
350         return std::make_unique<StreamStatisticianImpl>(
351             ssrc, clock, max_reordering_threshold);
352       });
353 }
354 
ReceiveStatisticsImpl(Clock * clock,std::function<std::unique_ptr<StreamStatisticianImplInterface> (uint32_t ssrc,Clock * clock,int max_reordering_threshold)> stream_statistician_factory)355 ReceiveStatisticsImpl::ReceiveStatisticsImpl(
356     Clock* clock,
357     std::function<std::unique_ptr<StreamStatisticianImplInterface>(
358         uint32_t ssrc,
359         Clock* clock,
360         int max_reordering_threshold)> stream_statistician_factory)
361     : clock_(clock),
362       stream_statistician_factory_(std::move(stream_statistician_factory)),
363       last_returned_ssrc_idx_(0),
364       max_reordering_threshold_(kDefaultMaxReorderingThreshold) {}
365 
OnRtpPacket(const RtpPacketReceived & packet)366 void ReceiveStatisticsImpl::OnRtpPacket(const RtpPacketReceived& packet) {
367   // StreamStatisticianImpl instance is created once and only destroyed when
368   // this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has
369   // it's own locking so don't hold receive_statistics_lock_ (potential
370   // deadlock).
371   GetOrCreateStatistician(packet.Ssrc())->UpdateCounters(packet);
372 }
373 
GetStatistician(uint32_t ssrc) const374 StreamStatistician* ReceiveStatisticsImpl::GetStatistician(
375     uint32_t ssrc) const {
376   const auto& it = statisticians_.find(ssrc);
377   if (it == statisticians_.end())
378     return nullptr;
379   return it->second.get();
380 }
381 
GetOrCreateStatistician(uint32_t ssrc)382 StreamStatisticianImplInterface* ReceiveStatisticsImpl::GetOrCreateStatistician(
383     uint32_t ssrc) {
384   std::unique_ptr<StreamStatisticianImplInterface>& impl = statisticians_[ssrc];
385   if (impl == nullptr) {  // new element
386     impl =
387         stream_statistician_factory_(ssrc, clock_, max_reordering_threshold_);
388     all_ssrcs_.push_back(ssrc);
389   }
390   return impl.get();
391 }
392 
SetMaxReorderingThreshold(int max_reordering_threshold)393 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
394     int max_reordering_threshold) {
395   max_reordering_threshold_ = max_reordering_threshold;
396   for (auto& statistician : statisticians_) {
397     statistician.second->SetMaxReorderingThreshold(max_reordering_threshold);
398   }
399 }
400 
SetMaxReorderingThreshold(uint32_t ssrc,int max_reordering_threshold)401 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
402     uint32_t ssrc,
403     int max_reordering_threshold) {
404   GetOrCreateStatistician(ssrc)->SetMaxReorderingThreshold(
405       max_reordering_threshold);
406 }
407 
EnableRetransmitDetection(uint32_t ssrc,bool enable)408 void ReceiveStatisticsImpl::EnableRetransmitDetection(uint32_t ssrc,
409                                                       bool enable) {
410   GetOrCreateStatistician(ssrc)->EnableRetransmitDetection(enable);
411 }
412 
RtcpReportBlocks(size_t max_blocks)413 std::vector<rtcp::ReportBlock> ReceiveStatisticsImpl::RtcpReportBlocks(
414     size_t max_blocks) {
415   std::vector<rtcp::ReportBlock> result;
416   result.reserve(std::min(max_blocks, all_ssrcs_.size()));
417 
418   size_t ssrc_idx = 0;
419   for (size_t i = 0; i < all_ssrcs_.size() && result.size() < max_blocks; ++i) {
420     ssrc_idx = (last_returned_ssrc_idx_ + i + 1) % all_ssrcs_.size();
421     const uint32_t media_ssrc = all_ssrcs_[ssrc_idx];
422     auto statistician_it = statisticians_.find(media_ssrc);
423     RTC_DCHECK(statistician_it != statisticians_.end());
424     statistician_it->second->MaybeAppendReportBlockAndReset(result);
425   }
426   last_returned_ssrc_idx_ = ssrc_idx;
427   return result;
428 }
429 
430 }  // namespace webrtc
431