• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/pacing/task_queue_paced_sender.h"
12 
13 #include <list>
14 #include <memory>
15 #include <string>
16 #include <utility>
17 #include <vector>
18 
19 #include "modules/pacing/packet_router.h"
20 #include "modules/utility/include/mock/mock_process_thread.h"
21 #include "test/field_trial.h"
22 #include "test/gmock.h"
23 #include "test/gtest.h"
24 #include "test/time_controller/simulated_time_controller.h"
25 
26 using ::testing::_;
27 using ::testing::AtLeast;
28 using ::testing::Return;
29 using ::testing::SaveArg;
30 
31 namespace webrtc {
32 namespace {
33 constexpr uint32_t kAudioSsrc = 12345;
34 constexpr uint32_t kVideoSsrc = 234565;
35 constexpr uint32_t kVideoRtxSsrc = 34567;
36 constexpr uint32_t kFlexFecSsrc = 45678;
37 constexpr size_t kDefaultPacketSize = 1234;
38 
39 class MockPacketRouter : public PacketRouter {
40  public:
41   MOCK_METHOD(void,
42               SendPacket,
43               (std::unique_ptr<RtpPacketToSend> packet,
44                const PacedPacketInfo& cluster_info),
45               (override));
46   MOCK_METHOD(std::vector<std::unique_ptr<RtpPacketToSend>>,
47               FetchFec,
48               (),
49               (override));
50   MOCK_METHOD(std::vector<std::unique_ptr<RtpPacketToSend>>,
51               GeneratePadding,
52               (DataSize target_size),
53               (override));
54 };
55 
56 class StatsUpdateObserver {
57  public:
58   StatsUpdateObserver() = default;
59   virtual ~StatsUpdateObserver() = default;
60 
61   virtual void OnStatsUpdated() = 0;
62 };
63 
64 class TaskQueuePacedSenderForTest : public TaskQueuePacedSender {
65  public:
TaskQueuePacedSenderForTest(Clock * clock,PacketRouter * packet_router,RtcEventLog * event_log,const WebRtcKeyValueConfig * field_trials,TaskQueueFactory * task_queue_factory,TimeDelta hold_back_window=PacingController::kMinSleepTime)66   TaskQueuePacedSenderForTest(
67       Clock* clock,
68       PacketRouter* packet_router,
69       RtcEventLog* event_log,
70       const WebRtcKeyValueConfig* field_trials,
71       TaskQueueFactory* task_queue_factory,
72       TimeDelta hold_back_window = PacingController::kMinSleepTime)
73       : TaskQueuePacedSender(clock,
74                              packet_router,
75                              event_log,
76                              field_trials,
77                              task_queue_factory,
78                              hold_back_window) {}
79 
OnStatsUpdated(const Stats & stats)80   void OnStatsUpdated(const Stats& stats) override {
81     ++num_stats_updates_;
82     TaskQueuePacedSender::OnStatsUpdated(stats);
83   }
84 
85   size_t num_stats_updates_ = 0;
86 };
87 }  // namespace
88 
89 namespace test {
90 
BuildRtpPacket(RtpPacketMediaType type)91   std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketMediaType type) {
92     auto packet = std::make_unique<RtpPacketToSend>(nullptr);
93     packet->set_packet_type(type);
94     switch (type) {
95       case RtpPacketMediaType::kAudio:
96         packet->SetSsrc(kAudioSsrc);
97         break;
98       case RtpPacketMediaType::kVideo:
99         packet->SetSsrc(kVideoSsrc);
100         break;
101       case RtpPacketMediaType::kRetransmission:
102       case RtpPacketMediaType::kPadding:
103         packet->SetSsrc(kVideoRtxSsrc);
104         break;
105       case RtpPacketMediaType::kForwardErrorCorrection:
106         packet->SetSsrc(kFlexFecSsrc);
107         break;
108     }
109 
110     packet->SetPayloadSize(kDefaultPacketSize);
111     return packet;
112   }
113 
GeneratePackets(RtpPacketMediaType type,size_t num_packets)114   std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets(
115       RtpPacketMediaType type,
116       size_t num_packets) {
117     std::vector<std::unique_ptr<RtpPacketToSend>> packets;
118     for (size_t i = 0; i < num_packets; ++i) {
119       packets.push_back(BuildRtpPacket(type));
120     }
121     return packets;
122   }
123 
TEST(TaskQueuePacedSenderTest,PacesPackets)124   TEST(TaskQueuePacedSenderTest, PacesPackets) {
125     GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
126     MockPacketRouter packet_router;
127     TaskQueuePacedSenderForTest pacer(
128         time_controller.GetClock(), &packet_router,
129         /*event_log=*/nullptr,
130         /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
131         PacingController::kMinSleepTime);
132 
133     // Insert a number of packets, covering one second.
134     static constexpr size_t kPacketsToSend = 42;
135     pacer.SetPacingRates(
136         DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend),
137         DataRate::Zero());
138     pacer.EnqueuePackets(
139         GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
140 
141     // Expect all of them to be sent.
142     size_t packets_sent = 0;
143     Timestamp end_time = Timestamp::PlusInfinity();
144     EXPECT_CALL(packet_router, SendPacket)
145         .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
146                             const PacedPacketInfo& cluster_info) {
147           ++packets_sent;
148           if (packets_sent == kPacketsToSend) {
149             end_time = time_controller.GetClock()->CurrentTime();
150           }
151         });
152 
153     const Timestamp start_time = time_controller.GetClock()->CurrentTime();
154 
155     // Packets should be sent over a period of close to 1s. Expect a little
156     // lower than this since initial probing is a bit quicker.
157     time_controller.AdvanceTime(TimeDelta::Seconds(1));
158     EXPECT_EQ(packets_sent, kPacketsToSend);
159     ASSERT_TRUE(end_time.IsFinite());
160     EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
161   }
162 
TEST(TaskQueuePacedSenderTest,ReschedulesProcessOnRateChange)163   TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
164     GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
165     MockPacketRouter packet_router;
166     TaskQueuePacedSenderForTest pacer(
167         time_controller.GetClock(), &packet_router,
168         /*event_log=*/nullptr,
169         /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
170         PacingController::kMinSleepTime);
171 
172     // Insert a number of packets to be sent 200ms apart.
173     const size_t kPacketsPerSecond = 5;
174     const DataRate kPacingRate =
175         DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond);
176     pacer.SetPacingRates(kPacingRate, DataRate::Zero());
177 
178     // Send some initial packets to be rid of any probes.
179     EXPECT_CALL(packet_router, SendPacket).Times(kPacketsPerSecond);
180     pacer.EnqueuePackets(
181         GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond));
182     time_controller.AdvanceTime(TimeDelta::Seconds(1));
183 
184     // Insert three packets, and record send time of each of them.
185     // After the second packet is sent, double the send rate so we can
186     // check the third packets is sent after half the wait time.
187     Timestamp first_packet_time = Timestamp::MinusInfinity();
188     Timestamp second_packet_time = Timestamp::MinusInfinity();
189     Timestamp third_packet_time = Timestamp::MinusInfinity();
190 
191     EXPECT_CALL(packet_router, SendPacket)
192         .Times(3)
193         .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
194                             const PacedPacketInfo& cluster_info) {
195           if (first_packet_time.IsInfinite()) {
196             first_packet_time = time_controller.GetClock()->CurrentTime();
197           } else if (second_packet_time.IsInfinite()) {
198             second_packet_time = time_controller.GetClock()->CurrentTime();
199             pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero());
200           } else {
201             third_packet_time = time_controller.GetClock()->CurrentTime();
202           }
203         });
204 
205     pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3));
206     time_controller.AdvanceTime(TimeDelta::Millis(500));
207     ASSERT_TRUE(third_packet_time.IsFinite());
208     EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0,
209                 1.0);
210     EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0,
211                 1.0);
212   }
213 
TEST(TaskQueuePacedSenderTest,SendsAudioImmediately)214   TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) {
215     GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
216     MockPacketRouter packet_router;
217     TaskQueuePacedSenderForTest pacer(
218         time_controller.GetClock(), &packet_router,
219         /*event_log=*/nullptr,
220         /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
221         PacingController::kMinSleepTime);
222 
223     const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
224     const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
225     const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate;
226 
227     pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
228 
229     // Add some initial video packets, only one should be sent.
230     EXPECT_CALL(packet_router, SendPacket);
231     pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
232     time_controller.AdvanceTime(TimeDelta::Zero());
233     ::testing::Mock::VerifyAndClearExpectations(&packet_router);
234 
235     // Advance time, but still before next packet should be sent.
236     time_controller.AdvanceTime(kPacketPacingTime / 2);
237 
238     // Insert an audio packet, it should be sent immediately.
239     EXPECT_CALL(packet_router, SendPacket);
240     pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1));
241     time_controller.AdvanceTime(TimeDelta::Zero());
242     ::testing::Mock::VerifyAndClearExpectations(&packet_router);
243   }
244 
TEST(TaskQueuePacedSenderTest,SleepsDuringCoalscingWindow)245   TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
246     const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
247     GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
248     MockPacketRouter packet_router;
249     TaskQueuePacedSenderForTest pacer(
250         time_controller.GetClock(), &packet_router,
251         /*event_log=*/nullptr,
252         /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
253         kCoalescingWindow);
254 
255     // Set rates so one packet adds one ms of buffer level.
256     const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
257     const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
258     const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
259 
260     pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
261 
262     // Add 10 packets. The first should be sent immediately since the buffers
263     // are clear.
264     EXPECT_CALL(packet_router, SendPacket);
265     pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
266     time_controller.AdvanceTime(TimeDelta::Zero());
267     ::testing::Mock::VerifyAndClearExpectations(&packet_router);
268 
269     // Advance time to 1ms before the coalescing window ends. No packets should
270     // be sent.
271     EXPECT_CALL(packet_router, SendPacket).Times(0);
272     time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
273 
274     // Advance time to where coalescing window ends. All packets that should
275     // have been sent up til now will be sent.
276     EXPECT_CALL(packet_router, SendPacket).Times(5);
277     time_controller.AdvanceTime(TimeDelta::Millis(1));
278     ::testing::Mock::VerifyAndClearExpectations(&packet_router);
279   }
280 
TEST(TaskQueuePacedSenderTest,ProbingOverridesCoalescingWindow)281   TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
282     const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
283     GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
284     MockPacketRouter packet_router;
285     TaskQueuePacedSenderForTest pacer(
286         time_controller.GetClock(), &packet_router,
287         /*event_log=*/nullptr,
288         /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
289         kCoalescingWindow);
290 
291     // Set rates so one packet adds one ms of buffer level.
292     const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
293     const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
294     const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
295 
296     pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
297 
298     // Add 10 packets. The first should be sent immediately since the buffers
299     // are clear. This will also trigger the probe to start.
300     EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
301     pacer.CreateProbeCluster(kPacingDataRate * 2, 17);
302     pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
303     time_controller.AdvanceTime(TimeDelta::Zero());
304     ::testing::Mock::VerifyAndClearExpectations(&packet_router);
305 
306     // Advance time to 1ms before the coalescing window ends. Packets should be
307     // flying.
308     EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
309     time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
310   }
311 
TEST(TaskQueuePacedSenderTest,RespectedMinTimeBetweenStatsUpdates)312   TEST(TaskQueuePacedSenderTest, RespectedMinTimeBetweenStatsUpdates) {
313     const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
314     GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
315     MockPacketRouter packet_router;
316     TaskQueuePacedSenderForTest pacer(
317         time_controller.GetClock(), &packet_router,
318         /*event_log=*/nullptr,
319         /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
320         kCoalescingWindow);
321     const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300);
322     pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
323 
324     const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
325 
326     // Nothing inserted, no stats updates yet.
327     EXPECT_EQ(pacer.num_stats_updates_, 0u);
328 
329     // Insert one packet, stats should be updated.
330     pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
331     time_controller.AdvanceTime(TimeDelta::Zero());
332     EXPECT_EQ(pacer.num_stats_updates_, 1u);
333 
334     // Advance time half of the min stats update interval, and trigger a
335     // refresh - stats should not be updated yet.
336     time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2);
337     pacer.EnqueuePackets({});
338     time_controller.AdvanceTime(TimeDelta::Zero());
339     EXPECT_EQ(pacer.num_stats_updates_, 1u);
340 
341     // Advance time the next half, now stats update is triggered.
342     time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2);
343     pacer.EnqueuePackets({});
344     time_controller.AdvanceTime(TimeDelta::Zero());
345     EXPECT_EQ(pacer.num_stats_updates_, 2u);
346   }
347 
TEST(TaskQueuePacedSenderTest,ThrottlesStatsUpdates)348   TEST(TaskQueuePacedSenderTest, ThrottlesStatsUpdates) {
349     const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
350     GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
351     MockPacketRouter packet_router;
352     TaskQueuePacedSenderForTest pacer(
353         time_controller.GetClock(), &packet_router,
354         /*event_log=*/nullptr,
355         /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
356         kCoalescingWindow);
357 
358     // Set rates so one packet adds 10ms of buffer level.
359     const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
360     const TimeDelta kPacketPacingTime = TimeDelta::Millis(10);
361     const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
362     const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
363     const TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33);
364 
365     // Nothing inserted, no stats updates yet.
366     size_t num_expected_stats_updates = 0;
367     EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates);
368     pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
369     time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates);
370     // Updating pacing rates refreshes stats.
371     EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
372 
373     // Record time when we insert first packet, this triggers the scheduled
374     // stats updating.
375     Clock* const clock = time_controller.GetClock();
376     const Timestamp start_time = clock->CurrentTime();
377 
378     while (clock->CurrentTime() - start_time <=
379            kMaxTimeBetweenStatsUpdates - kPacketPacingTime) {
380       // Enqueue packet, expect stats update.
381       pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
382       time_controller.AdvanceTime(TimeDelta::Zero());
383       EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
384 
385       // Advance time to halfway through pacing time, expect another stats
386       // update.
387       time_controller.AdvanceTime(kPacketPacingTime / 2);
388       pacer.EnqueuePackets({});
389       time_controller.AdvanceTime(TimeDelta::Zero());
390       EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
391 
392       // Advance time the rest of the way.
393       time_controller.AdvanceTime(kPacketPacingTime / 2);
394     }
395 
396     // At this point, the pace queue is drained so there is no more intersting
397     // update to be made - but there is still as schduled task that should run
398     // |kMaxTimeBetweenStatsUpdates| after the first update.
399     time_controller.AdvanceTime(start_time + kMaxTimeBetweenStatsUpdates -
400                                 clock->CurrentTime());
401     EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
402 
403     // Advance time a significant time - don't expect any more calls as stats
404     // updating does not happen when queue is drained.
405     time_controller.AdvanceTime(TimeDelta::Millis(400));
406     EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates);
407   }
408 
409 }  // namespace test
410 }  // namespace webrtc
411