1 /* 2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "modules/pacing/task_queue_paced_sender.h" 12 13 #include <list> 14 #include <memory> 15 #include <string> 16 #include <utility> 17 #include <vector> 18 19 #include "modules/pacing/packet_router.h" 20 #include "modules/utility/include/mock/mock_process_thread.h" 21 #include "test/field_trial.h" 22 #include "test/gmock.h" 23 #include "test/gtest.h" 24 #include "test/time_controller/simulated_time_controller.h" 25 26 using ::testing::_; 27 using ::testing::AtLeast; 28 using ::testing::Return; 29 using ::testing::SaveArg; 30 31 namespace webrtc { 32 namespace { 33 constexpr uint32_t kAudioSsrc = 12345; 34 constexpr uint32_t kVideoSsrc = 234565; 35 constexpr uint32_t kVideoRtxSsrc = 34567; 36 constexpr uint32_t kFlexFecSsrc = 45678; 37 constexpr size_t kDefaultPacketSize = 1234; 38 39 class MockPacketRouter : public PacketRouter { 40 public: 41 MOCK_METHOD(void, 42 SendPacket, 43 (std::unique_ptr<RtpPacketToSend> packet, 44 const PacedPacketInfo& cluster_info), 45 (override)); 46 MOCK_METHOD(std::vector<std::unique_ptr<RtpPacketToSend>>, 47 FetchFec, 48 (), 49 (override)); 50 MOCK_METHOD(std::vector<std::unique_ptr<RtpPacketToSend>>, 51 GeneratePadding, 52 (DataSize target_size), 53 (override)); 54 }; 55 56 class StatsUpdateObserver { 57 public: 58 StatsUpdateObserver() = default; 59 virtual ~StatsUpdateObserver() = default; 60 61 virtual void OnStatsUpdated() = 0; 62 }; 63 64 class TaskQueuePacedSenderForTest : public TaskQueuePacedSender { 65 public: TaskQueuePacedSenderForTest(Clock * clock,PacketRouter * packet_router,RtcEventLog * event_log,const WebRtcKeyValueConfig * field_trials,TaskQueueFactory * task_queue_factory,TimeDelta hold_back_window=PacingController::kMinSleepTime)66 TaskQueuePacedSenderForTest( 67 Clock* clock, 68 PacketRouter* packet_router, 69 RtcEventLog* event_log, 70 const WebRtcKeyValueConfig* field_trials, 71 TaskQueueFactory* task_queue_factory, 72 TimeDelta hold_back_window = PacingController::kMinSleepTime) 73 : TaskQueuePacedSender(clock, 74 packet_router, 75 event_log, 76 field_trials, 77 task_queue_factory, 78 hold_back_window) {} 79 OnStatsUpdated(const Stats & stats)80 void OnStatsUpdated(const Stats& stats) override { 81 ++num_stats_updates_; 82 TaskQueuePacedSender::OnStatsUpdated(stats); 83 } 84 85 size_t num_stats_updates_ = 0; 86 }; 87 } // namespace 88 89 namespace test { 90 BuildRtpPacket(RtpPacketMediaType type)91 std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketMediaType type) { 92 auto packet = std::make_unique<RtpPacketToSend>(nullptr); 93 packet->set_packet_type(type); 94 switch (type) { 95 case RtpPacketMediaType::kAudio: 96 packet->SetSsrc(kAudioSsrc); 97 break; 98 case RtpPacketMediaType::kVideo: 99 packet->SetSsrc(kVideoSsrc); 100 break; 101 case RtpPacketMediaType::kRetransmission: 102 case RtpPacketMediaType::kPadding: 103 packet->SetSsrc(kVideoRtxSsrc); 104 break; 105 case RtpPacketMediaType::kForwardErrorCorrection: 106 packet->SetSsrc(kFlexFecSsrc); 107 break; 108 } 109 110 packet->SetPayloadSize(kDefaultPacketSize); 111 return packet; 112 } 113 GeneratePackets(RtpPacketMediaType type,size_t num_packets)114 std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets( 115 RtpPacketMediaType type, 116 size_t num_packets) { 117 std::vector<std::unique_ptr<RtpPacketToSend>> packets; 118 for (size_t i = 0; i < num_packets; ++i) { 119 packets.push_back(BuildRtpPacket(type)); 120 } 121 return packets; 122 } 123 TEST(TaskQueuePacedSenderTest,PacesPackets)124 TEST(TaskQueuePacedSenderTest, PacesPackets) { 125 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 126 MockPacketRouter packet_router; 127 TaskQueuePacedSenderForTest pacer( 128 time_controller.GetClock(), &packet_router, 129 /*event_log=*/nullptr, 130 /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), 131 PacingController::kMinSleepTime); 132 133 // Insert a number of packets, covering one second. 134 static constexpr size_t kPacketsToSend = 42; 135 pacer.SetPacingRates( 136 DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend), 137 DataRate::Zero()); 138 pacer.EnqueuePackets( 139 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend)); 140 141 // Expect all of them to be sent. 142 size_t packets_sent = 0; 143 Timestamp end_time = Timestamp::PlusInfinity(); 144 EXPECT_CALL(packet_router, SendPacket) 145 .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet, 146 const PacedPacketInfo& cluster_info) { 147 ++packets_sent; 148 if (packets_sent == kPacketsToSend) { 149 end_time = time_controller.GetClock()->CurrentTime(); 150 } 151 }); 152 153 const Timestamp start_time = time_controller.GetClock()->CurrentTime(); 154 155 // Packets should be sent over a period of close to 1s. Expect a little 156 // lower than this since initial probing is a bit quicker. 157 time_controller.AdvanceTime(TimeDelta::Seconds(1)); 158 EXPECT_EQ(packets_sent, kPacketsToSend); 159 ASSERT_TRUE(end_time.IsFinite()); 160 EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0); 161 } 162 TEST(TaskQueuePacedSenderTest,ReschedulesProcessOnRateChange)163 TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { 164 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 165 MockPacketRouter packet_router; 166 TaskQueuePacedSenderForTest pacer( 167 time_controller.GetClock(), &packet_router, 168 /*event_log=*/nullptr, 169 /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), 170 PacingController::kMinSleepTime); 171 172 // Insert a number of packets to be sent 200ms apart. 173 const size_t kPacketsPerSecond = 5; 174 const DataRate kPacingRate = 175 DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond); 176 pacer.SetPacingRates(kPacingRate, DataRate::Zero()); 177 178 // Send some initial packets to be rid of any probes. 179 EXPECT_CALL(packet_router, SendPacket).Times(kPacketsPerSecond); 180 pacer.EnqueuePackets( 181 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond)); 182 time_controller.AdvanceTime(TimeDelta::Seconds(1)); 183 184 // Insert three packets, and record send time of each of them. 185 // After the second packet is sent, double the send rate so we can 186 // check the third packets is sent after half the wait time. 187 Timestamp first_packet_time = Timestamp::MinusInfinity(); 188 Timestamp second_packet_time = Timestamp::MinusInfinity(); 189 Timestamp third_packet_time = Timestamp::MinusInfinity(); 190 191 EXPECT_CALL(packet_router, SendPacket) 192 .Times(3) 193 .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet, 194 const PacedPacketInfo& cluster_info) { 195 if (first_packet_time.IsInfinite()) { 196 first_packet_time = time_controller.GetClock()->CurrentTime(); 197 } else if (second_packet_time.IsInfinite()) { 198 second_packet_time = time_controller.GetClock()->CurrentTime(); 199 pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero()); 200 } else { 201 third_packet_time = time_controller.GetClock()->CurrentTime(); 202 } 203 }); 204 205 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3)); 206 time_controller.AdvanceTime(TimeDelta::Millis(500)); 207 ASSERT_TRUE(third_packet_time.IsFinite()); 208 EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0, 209 1.0); 210 EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0, 211 1.0); 212 } 213 TEST(TaskQueuePacedSenderTest,SendsAudioImmediately)214 TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) { 215 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 216 MockPacketRouter packet_router; 217 TaskQueuePacedSenderForTest pacer( 218 time_controller.GetClock(), &packet_router, 219 /*event_log=*/nullptr, 220 /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), 221 PacingController::kMinSleepTime); 222 223 const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125); 224 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); 225 const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate; 226 227 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); 228 229 // Add some initial video packets, only one should be sent. 230 EXPECT_CALL(packet_router, SendPacket); 231 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); 232 time_controller.AdvanceTime(TimeDelta::Zero()); 233 ::testing::Mock::VerifyAndClearExpectations(&packet_router); 234 235 // Advance time, but still before next packet should be sent. 236 time_controller.AdvanceTime(kPacketPacingTime / 2); 237 238 // Insert an audio packet, it should be sent immediately. 239 EXPECT_CALL(packet_router, SendPacket); 240 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1)); 241 time_controller.AdvanceTime(TimeDelta::Zero()); 242 ::testing::Mock::VerifyAndClearExpectations(&packet_router); 243 } 244 TEST(TaskQueuePacedSenderTest,SleepsDuringCoalscingWindow)245 TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) { 246 const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); 247 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 248 MockPacketRouter packet_router; 249 TaskQueuePacedSenderForTest pacer( 250 time_controller.GetClock(), &packet_router, 251 /*event_log=*/nullptr, 252 /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), 253 kCoalescingWindow); 254 255 // Set rates so one packet adds one ms of buffer level. 256 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); 257 const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); 258 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; 259 260 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); 261 262 // Add 10 packets. The first should be sent immediately since the buffers 263 // are clear. 264 EXPECT_CALL(packet_router, SendPacket); 265 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); 266 time_controller.AdvanceTime(TimeDelta::Zero()); 267 ::testing::Mock::VerifyAndClearExpectations(&packet_router); 268 269 // Advance time to 1ms before the coalescing window ends. No packets should 270 // be sent. 271 EXPECT_CALL(packet_router, SendPacket).Times(0); 272 time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); 273 274 // Advance time to where coalescing window ends. All packets that should 275 // have been sent up til now will be sent. 276 EXPECT_CALL(packet_router, SendPacket).Times(5); 277 time_controller.AdvanceTime(TimeDelta::Millis(1)); 278 ::testing::Mock::VerifyAndClearExpectations(&packet_router); 279 } 280 TEST(TaskQueuePacedSenderTest,ProbingOverridesCoalescingWindow)281 TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) { 282 const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); 283 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 284 MockPacketRouter packet_router; 285 TaskQueuePacedSenderForTest pacer( 286 time_controller.GetClock(), &packet_router, 287 /*event_log=*/nullptr, 288 /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), 289 kCoalescingWindow); 290 291 // Set rates so one packet adds one ms of buffer level. 292 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); 293 const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); 294 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; 295 296 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); 297 298 // Add 10 packets. The first should be sent immediately since the buffers 299 // are clear. This will also trigger the probe to start. 300 EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1)); 301 pacer.CreateProbeCluster(kPacingDataRate * 2, 17); 302 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); 303 time_controller.AdvanceTime(TimeDelta::Zero()); 304 ::testing::Mock::VerifyAndClearExpectations(&packet_router); 305 306 // Advance time to 1ms before the coalescing window ends. Packets should be 307 // flying. 308 EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1)); 309 time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); 310 } 311 TEST(TaskQueuePacedSenderTest,RespectedMinTimeBetweenStatsUpdates)312 TEST(TaskQueuePacedSenderTest, RespectedMinTimeBetweenStatsUpdates) { 313 const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); 314 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 315 MockPacketRouter packet_router; 316 TaskQueuePacedSenderForTest pacer( 317 time_controller.GetClock(), &packet_router, 318 /*event_log=*/nullptr, 319 /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), 320 kCoalescingWindow); 321 const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300); 322 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); 323 324 const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); 325 326 // Nothing inserted, no stats updates yet. 327 EXPECT_EQ(pacer.num_stats_updates_, 0u); 328 329 // Insert one packet, stats should be updated. 330 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); 331 time_controller.AdvanceTime(TimeDelta::Zero()); 332 EXPECT_EQ(pacer.num_stats_updates_, 1u); 333 334 // Advance time half of the min stats update interval, and trigger a 335 // refresh - stats should not be updated yet. 336 time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2); 337 pacer.EnqueuePackets({}); 338 time_controller.AdvanceTime(TimeDelta::Zero()); 339 EXPECT_EQ(pacer.num_stats_updates_, 1u); 340 341 // Advance time the next half, now stats update is triggered. 342 time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2); 343 pacer.EnqueuePackets({}); 344 time_controller.AdvanceTime(TimeDelta::Zero()); 345 EXPECT_EQ(pacer.num_stats_updates_, 2u); 346 } 347 TEST(TaskQueuePacedSenderTest,ThrottlesStatsUpdates)348 TEST(TaskQueuePacedSenderTest, ThrottlesStatsUpdates) { 349 const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); 350 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 351 MockPacketRouter packet_router; 352 TaskQueuePacedSenderForTest pacer( 353 time_controller.GetClock(), &packet_router, 354 /*event_log=*/nullptr, 355 /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), 356 kCoalescingWindow); 357 358 // Set rates so one packet adds 10ms of buffer level. 359 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); 360 const TimeDelta kPacketPacingTime = TimeDelta::Millis(10); 361 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; 362 const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1); 363 const TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33); 364 365 // Nothing inserted, no stats updates yet. 366 size_t num_expected_stats_updates = 0; 367 EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates); 368 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); 369 time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates); 370 // Updating pacing rates refreshes stats. 371 EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); 372 373 // Record time when we insert first packet, this triggers the scheduled 374 // stats updating. 375 Clock* const clock = time_controller.GetClock(); 376 const Timestamp start_time = clock->CurrentTime(); 377 378 while (clock->CurrentTime() - start_time <= 379 kMaxTimeBetweenStatsUpdates - kPacketPacingTime) { 380 // Enqueue packet, expect stats update. 381 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); 382 time_controller.AdvanceTime(TimeDelta::Zero()); 383 EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); 384 385 // Advance time to halfway through pacing time, expect another stats 386 // update. 387 time_controller.AdvanceTime(kPacketPacingTime / 2); 388 pacer.EnqueuePackets({}); 389 time_controller.AdvanceTime(TimeDelta::Zero()); 390 EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); 391 392 // Advance time the rest of the way. 393 time_controller.AdvanceTime(kPacketPacingTime / 2); 394 } 395 396 // At this point, the pace queue is drained so there is no more intersting 397 // update to be made - but there is still as schduled task that should run 398 // |kMaxTimeBetweenStatsUpdates| after the first update. 399 time_controller.AdvanceTime(start_time + kMaxTimeBetweenStatsUpdates - 400 clock->CurrentTime()); 401 EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates); 402 403 // Advance time a significant time - don't expect any more calls as stats 404 // updating does not happen when queue is drained. 405 time_controller.AdvanceTime(TimeDelta::Millis(400)); 406 EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates); 407 } 408 409 } // namespace test 410 } // namespace webrtc 411