1 /*
2 * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/modules/remote_bitrate_estimator/test/packet_sender.h"
12
13 #include <algorithm>
14 #include <list>
15 #include <sstream>
16
17 #include "webrtc/base/checks.h"
18 #include "webrtc/modules/include/module_common_types.h"
19 #include "webrtc/modules/remote_bitrate_estimator/test/bwe.h"
20 #include "webrtc/modules/remote_bitrate_estimator/test/metric_recorder.h"
21
22 namespace webrtc {
23 namespace testing {
24 namespace bwe {
25
Pause()26 void PacketSender::Pause() {
27 running_ = false;
28 if (metric_recorder_ != nullptr) {
29 metric_recorder_->PauseFlow();
30 }
31 }
32
Resume(int64_t paused_time_ms)33 void PacketSender::Resume(int64_t paused_time_ms) {
34 running_ = true;
35 if (metric_recorder_ != nullptr) {
36 metric_recorder_->ResumeFlow(paused_time_ms);
37 }
38 }
39
set_metric_recorder(MetricRecorder * metric_recorder)40 void PacketSender::set_metric_recorder(MetricRecorder* metric_recorder) {
41 metric_recorder_ = metric_recorder;
42 }
43
RecordBitrate()44 void PacketSender::RecordBitrate() {
45 if (metric_recorder_ != nullptr) {
46 BWE_TEST_LOGGING_CONTEXT("Sender");
47 BWE_TEST_LOGGING_CONTEXT(*flow_ids().begin());
48 metric_recorder_->UpdateTimeMs(clock_.TimeInMilliseconds());
49 metric_recorder_->UpdateSendingEstimateKbps(TargetBitrateKbps());
50 }
51 }
52
GetFeedbackPackets(Packets * in_out,int64_t end_time_ms,int flow_id)53 std::list<FeedbackPacket*> GetFeedbackPackets(Packets* in_out,
54 int64_t end_time_ms,
55 int flow_id) {
56 std::list<FeedbackPacket*> fb_packets;
57 for (auto it = in_out->begin(); it != in_out->end();) {
58 if ((*it)->send_time_us() > 1000 * end_time_ms)
59 break;
60 if ((*it)->GetPacketType() == Packet::kFeedback &&
61 flow_id == (*it)->flow_id()) {
62 fb_packets.push_back(static_cast<FeedbackPacket*>(*it));
63 it = in_out->erase(it);
64 } else {
65 ++it;
66 }
67 }
68 return fb_packets;
69 }
70
VideoSender(PacketProcessorListener * listener,VideoSource * source,BandwidthEstimatorType estimator_type)71 VideoSender::VideoSender(PacketProcessorListener* listener,
72 VideoSource* source,
73 BandwidthEstimatorType estimator_type)
74 : PacketSender(listener, source->flow_id()),
75 source_(source),
76 bwe_(CreateBweSender(estimator_type,
77 source_->bits_per_second() / 1000,
78 this,
79 &clock_)),
80 previous_sending_bitrate_(0) {
81 modules_.push_back(bwe_.get());
82 }
83
~VideoSender()84 VideoSender::~VideoSender() {
85 }
86
Pause()87 void VideoSender::Pause() {
88 previous_sending_bitrate_ = TargetBitrateKbps();
89 PacketSender::Pause();
90 }
91
Resume(int64_t paused_time_ms)92 void VideoSender::Resume(int64_t paused_time_ms) {
93 source_->SetBitrateBps(previous_sending_bitrate_);
94 PacketSender::Resume(paused_time_ms);
95 }
96
RunFor(int64_t time_ms,Packets * in_out)97 void VideoSender::RunFor(int64_t time_ms, Packets* in_out) {
98 std::list<FeedbackPacket*> feedbacks = GetFeedbackPackets(
99 in_out, clock_.TimeInMilliseconds() + time_ms, source_->flow_id());
100 ProcessFeedbackAndGeneratePackets(time_ms, &feedbacks, in_out);
101 }
102
ProcessFeedbackAndGeneratePackets(int64_t time_ms,std::list<FeedbackPacket * > * feedbacks,Packets * packets)103 void VideoSender::ProcessFeedbackAndGeneratePackets(
104 int64_t time_ms,
105 std::list<FeedbackPacket*>* feedbacks,
106 Packets* packets) {
107 do {
108 // Make sure to at least run Process() below every 100 ms.
109 int64_t time_to_run_ms = std::min<int64_t>(time_ms, 100);
110 if (!feedbacks->empty()) {
111 int64_t time_until_feedback_ms =
112 feedbacks->front()->send_time_ms() - clock_.TimeInMilliseconds();
113 time_to_run_ms =
114 std::max<int64_t>(std::min(time_ms, time_until_feedback_ms), 0);
115 }
116
117 if (!running_) {
118 source_->SetBitrateBps(0);
119 }
120
121 Packets generated;
122 source_->RunFor(time_to_run_ms, &generated);
123 bwe_->OnPacketsSent(generated);
124 packets->merge(generated, DereferencingComparator<Packet>);
125
126 clock_.AdvanceTimeMilliseconds(time_to_run_ms);
127
128 if (!feedbacks->empty()) {
129 bwe_->GiveFeedback(*feedbacks->front());
130 delete feedbacks->front();
131 feedbacks->pop_front();
132 }
133
134 bwe_->Process();
135
136 time_ms -= time_to_run_ms;
137 } while (time_ms > 0);
138 assert(feedbacks->empty());
139 }
140
GetFeedbackIntervalMs() const141 int VideoSender::GetFeedbackIntervalMs() const {
142 return bwe_->GetFeedbackIntervalMs();
143 }
144
OnNetworkChanged(uint32_t target_bitrate_bps,uint8_t fraction_lost,int64_t rtt)145 void VideoSender::OnNetworkChanged(uint32_t target_bitrate_bps,
146 uint8_t fraction_lost,
147 int64_t rtt) {
148 source_->SetBitrateBps(target_bitrate_bps);
149 RecordBitrate();
150 }
151
TargetBitrateKbps()152 uint32_t VideoSender::TargetBitrateKbps() {
153 return (source_->bits_per_second() + 500) / 1000;
154 }
155
PacedVideoSender(PacketProcessorListener * listener,VideoSource * source,BandwidthEstimatorType estimator)156 PacedVideoSender::PacedVideoSender(PacketProcessorListener* listener,
157 VideoSource* source,
158 BandwidthEstimatorType estimator)
159 : VideoSender(listener, source, estimator),
160 pacer_(&clock_,
161 this,
162 source->bits_per_second() / 1000,
163 PacedSender::kDefaultPaceMultiplier * source->bits_per_second() /
164 1000,
165 0) {
166 modules_.push_back(&pacer_);
167 }
168
~PacedVideoSender()169 PacedVideoSender::~PacedVideoSender() {
170 for (Packet* packet : pacer_queue_)
171 delete packet;
172 for (Packet* packet : queue_)
173 delete packet;
174 }
175
RunFor(int64_t time_ms,Packets * in_out)176 void PacedVideoSender::RunFor(int64_t time_ms, Packets* in_out) {
177 int64_t end_time_ms = clock_.TimeInMilliseconds() + time_ms;
178 // Run process periodically to allow the packets to be paced out.
179 std::list<FeedbackPacket*> feedbacks =
180 GetFeedbackPackets(in_out, end_time_ms, source_->flow_id());
181 int64_t last_run_time_ms = -1;
182 BWE_TEST_LOGGING_CONTEXT("Sender");
183 BWE_TEST_LOGGING_CONTEXT(source_->flow_id());
184 do {
185 int64_t time_until_process_ms = TimeUntilNextProcess(modules_);
186 int64_t time_until_feedback_ms = time_ms;
187 if (!feedbacks.empty())
188 time_until_feedback_ms = std::max<int64_t>(
189 feedbacks.front()->send_time_ms() - clock_.TimeInMilliseconds(), 0);
190
191 int64_t time_until_next_event_ms =
192 std::min(time_until_feedback_ms, time_until_process_ms);
193
194 time_until_next_event_ms =
195 std::min(source_->GetTimeUntilNextFrameMs(), time_until_next_event_ms);
196
197 // Never run for longer than we have been asked for.
198 if (clock_.TimeInMilliseconds() + time_until_next_event_ms > end_time_ms)
199 time_until_next_event_ms = end_time_ms - clock_.TimeInMilliseconds();
200
201 // Make sure we don't get stuck if an event doesn't trigger. This typically
202 // happens if the prober wants to probe, but there's no packet to send.
203 if (time_until_next_event_ms == 0 && last_run_time_ms == 0)
204 time_until_next_event_ms = 1;
205 last_run_time_ms = time_until_next_event_ms;
206
207 Packets generated_packets;
208 source_->RunFor(time_until_next_event_ms, &generated_packets);
209 if (!generated_packets.empty()) {
210 for (Packet* packet : generated_packets) {
211 MediaPacket* media_packet = static_cast<MediaPacket*>(packet);
212 pacer_.InsertPacket(
213 PacedSender::kNormalPriority, media_packet->header().ssrc,
214 media_packet->header().sequenceNumber, media_packet->send_time_ms(),
215 media_packet->payload_size(), false);
216 pacer_queue_.push_back(packet);
217 assert(pacer_queue_.size() < 10000);
218 }
219 }
220
221 clock_.AdvanceTimeMilliseconds(time_until_next_event_ms);
222
223 if (time_until_next_event_ms == time_until_feedback_ms) {
224 if (!feedbacks.empty()) {
225 bwe_->GiveFeedback(*feedbacks.front());
226 delete feedbacks.front();
227 feedbacks.pop_front();
228 }
229 bwe_->Process();
230 }
231
232 if (time_until_next_event_ms == time_until_process_ms) {
233 CallProcess(modules_);
234 }
235 } while (clock_.TimeInMilliseconds() < end_time_ms);
236 QueuePackets(in_out, end_time_ms * 1000);
237 }
238
TimeUntilNextProcess(const std::list<Module * > & modules)239 int64_t PacedVideoSender::TimeUntilNextProcess(
240 const std::list<Module*>& modules) {
241 int64_t time_until_next_process_ms = 10;
242 for (Module* module : modules) {
243 int64_t next_process_ms = module->TimeUntilNextProcess();
244 if (next_process_ms < time_until_next_process_ms)
245 time_until_next_process_ms = next_process_ms;
246 }
247 if (time_until_next_process_ms < 0)
248 time_until_next_process_ms = 0;
249 return time_until_next_process_ms;
250 }
251
CallProcess(const std::list<Module * > & modules)252 void PacedVideoSender::CallProcess(const std::list<Module*>& modules) {
253 for (Module* module : modules) {
254 if (module->TimeUntilNextProcess() <= 0) {
255 module->Process();
256 }
257 }
258 }
259
QueuePackets(Packets * batch,int64_t end_of_batch_time_us)260 void PacedVideoSender::QueuePackets(Packets* batch,
261 int64_t end_of_batch_time_us) {
262 queue_.merge(*batch, DereferencingComparator<Packet>);
263 if (queue_.empty()) {
264 return;
265 }
266 Packets::iterator it = queue_.begin();
267 for (; it != queue_.end(); ++it) {
268 if ((*it)->send_time_us() > end_of_batch_time_us) {
269 break;
270 }
271 }
272 Packets to_transfer;
273 to_transfer.splice(to_transfer.begin(), queue_, queue_.begin(), it);
274 for (Packet* packet : to_transfer)
275 packet->set_paced(true);
276 bwe_->OnPacketsSent(to_transfer);
277 batch->merge(to_transfer, DereferencingComparator<Packet>);
278 }
279
TimeToSendPacket(uint32_t ssrc,uint16_t sequence_number,int64_t capture_time_ms,bool retransmission)280 bool PacedVideoSender::TimeToSendPacket(uint32_t ssrc,
281 uint16_t sequence_number,
282 int64_t capture_time_ms,
283 bool retransmission) {
284 for (Packets::iterator it = pacer_queue_.begin(); it != pacer_queue_.end();
285 ++it) {
286 MediaPacket* media_packet = static_cast<MediaPacket*>(*it);
287 if (media_packet->header().sequenceNumber == sequence_number) {
288 int64_t pace_out_time_ms = clock_.TimeInMilliseconds();
289
290 // Make sure a packet is never paced out earlier than when it was put into
291 // the pacer.
292 assert(pace_out_time_ms >= media_packet->send_time_ms());
293
294 media_packet->SetAbsSendTimeMs(pace_out_time_ms);
295 media_packet->set_send_time_us(1000 * pace_out_time_ms);
296 media_packet->set_sender_timestamp_us(1000 * pace_out_time_ms);
297 queue_.push_back(media_packet);
298 pacer_queue_.erase(it);
299 return true;
300 }
301 }
302 return false;
303 }
304
TimeToSendPadding(size_t bytes)305 size_t PacedVideoSender::TimeToSendPadding(size_t bytes) {
306 return 0;
307 }
308
OnNetworkChanged(uint32_t target_bitrate_bps,uint8_t fraction_lost,int64_t rtt)309 void PacedVideoSender::OnNetworkChanged(uint32_t target_bitrate_bps,
310 uint8_t fraction_lost,
311 int64_t rtt) {
312 VideoSender::OnNetworkChanged(target_bitrate_bps, fraction_lost, rtt);
313 pacer_.UpdateBitrate(
314 target_bitrate_bps / 1000,
315 PacedSender::kDefaultPaceMultiplier * target_bitrate_bps / 1000, 0);
316 }
317
318 const int kNoLimit = std::numeric_limits<int>::max();
319 const int kPacketSizeBytes = 1200;
320
TcpSender(PacketProcessorListener * listener,int flow_id,int64_t offset_ms)321 TcpSender::TcpSender(PacketProcessorListener* listener,
322 int flow_id,
323 int64_t offset_ms)
324 : TcpSender(listener, flow_id, offset_ms, kNoLimit) {
325 }
326
TcpSender(PacketProcessorListener * listener,int flow_id,int64_t offset_ms,int send_limit_bytes)327 TcpSender::TcpSender(PacketProcessorListener* listener,
328 int flow_id,
329 int64_t offset_ms,
330 int send_limit_bytes)
331 : PacketSender(listener, flow_id),
332 cwnd_(10),
333 ssthresh_(kNoLimit),
334 ack_received_(false),
335 last_acked_seq_num_(0),
336 next_sequence_number_(0),
337 offset_ms_(offset_ms),
338 last_reduction_time_ms_(-1),
339 last_rtt_ms_(0),
340 total_sent_bytes_(0),
341 send_limit_bytes_(send_limit_bytes),
342 last_generated_packets_ms_(0),
343 num_recent_sent_packets_(0),
344 bitrate_kbps_(0) {
345 }
346
RunFor(int64_t time_ms,Packets * in_out)347 void TcpSender::RunFor(int64_t time_ms, Packets* in_out) {
348 if (clock_.TimeInMilliseconds() + time_ms < offset_ms_) {
349 clock_.AdvanceTimeMilliseconds(time_ms);
350 if (running_) {
351 Pause();
352 }
353 return;
354 }
355
356 if (!running_ && total_sent_bytes_ == 0) {
357 Resume(offset_ms_);
358 }
359
360 int64_t start_time_ms = clock_.TimeInMilliseconds();
361
362 std::list<FeedbackPacket*> feedbacks = GetFeedbackPackets(
363 in_out, clock_.TimeInMilliseconds() + time_ms, *flow_ids().begin());
364 // The number of packets which are sent in during time_ms depends on the
365 // number of packets in_flight_ and the max number of packets in flight
366 // (cwnd_). Therefore SendPackets() isn't directly dependent on time_ms.
367 for (FeedbackPacket* fb : feedbacks) {
368 clock_.AdvanceTimeMilliseconds(fb->send_time_ms() -
369 clock_.TimeInMilliseconds());
370 last_rtt_ms_ = fb->send_time_ms() - fb->latest_send_time_ms();
371 UpdateCongestionControl(fb);
372 SendPackets(in_out);
373 }
374
375 for (auto it = in_flight_.begin(); it != in_flight_.end();) {
376 if (it->time_ms < clock_.TimeInMilliseconds() - 1000)
377 in_flight_.erase(it++);
378 else
379 ++it;
380 }
381
382 clock_.AdvanceTimeMilliseconds(time_ms -
383 (clock_.TimeInMilliseconds() - start_time_ms));
384 SendPackets(in_out);
385 }
386
SendPackets(Packets * in_out)387 void TcpSender::SendPackets(Packets* in_out) {
388 int cwnd = ceil(cwnd_);
389 int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
390 int timed_out = TriggerTimeouts();
391 if (timed_out > 0) {
392 HandleLoss();
393 }
394 if (packets_to_send > 0) {
395 Packets generated = GeneratePackets(packets_to_send);
396 for (Packet* packet : generated)
397 in_flight_.insert(InFlight(*static_cast<MediaPacket*>(packet)));
398
399 in_out->merge(generated, DereferencingComparator<Packet>);
400 }
401 }
402
UpdateCongestionControl(const FeedbackPacket * fb)403 void TcpSender::UpdateCongestionControl(const FeedbackPacket* fb) {
404 const TcpFeedback* tcp_fb = static_cast<const TcpFeedback*>(fb);
405 RTC_DCHECK(!tcp_fb->acked_packets().empty());
406 ack_received_ = true;
407
408 uint16_t expected = tcp_fb->acked_packets().back() - last_acked_seq_num_;
409 uint16_t missing =
410 expected - static_cast<uint16_t>(tcp_fb->acked_packets().size());
411
412 for (uint16_t ack_seq_num : tcp_fb->acked_packets())
413 in_flight_.erase(InFlight(ack_seq_num, clock_.TimeInMilliseconds()));
414
415 if (missing > 0) {
416 HandleLoss();
417 } else if (cwnd_ <= ssthresh_) {
418 cwnd_ += tcp_fb->acked_packets().size();
419 } else {
420 cwnd_ += 1.0f / cwnd_;
421 }
422
423 last_acked_seq_num_ =
424 LatestSequenceNumber(tcp_fb->acked_packets().back(), last_acked_seq_num_);
425 }
426
TriggerTimeouts()427 int TcpSender::TriggerTimeouts() {
428 int timed_out = 0;
429 for (auto it = in_flight_.begin(); it != in_flight_.end();) {
430 if (it->time_ms < clock_.TimeInMilliseconds() - 1000) {
431 in_flight_.erase(it++);
432 ++timed_out;
433 } else {
434 ++it;
435 }
436 }
437 return timed_out;
438 }
439
HandleLoss()440 void TcpSender::HandleLoss() {
441 if (clock_.TimeInMilliseconds() - last_reduction_time_ms_ < last_rtt_ms_)
442 return;
443 last_reduction_time_ms_ = clock_.TimeInMilliseconds();
444 ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
445 cwnd_ = ssthresh_;
446 }
447
GeneratePackets(size_t num_packets)448 Packets TcpSender::GeneratePackets(size_t num_packets) {
449 Packets generated;
450
451 UpdateSendBitrateEstimate(num_packets);
452
453 for (size_t i = 0; i < num_packets; ++i) {
454 if ((total_sent_bytes_ + kPacketSizeBytes) > send_limit_bytes_) {
455 if (running_) {
456 Pause();
457 }
458 break;
459 }
460 generated.push_back(
461 new MediaPacket(*flow_ids().begin(), 1000 * clock_.TimeInMilliseconds(),
462 kPacketSizeBytes, next_sequence_number_++));
463 generated.back()->set_sender_timestamp_us(
464 1000 * clock_.TimeInMilliseconds());
465
466 total_sent_bytes_ += kPacketSizeBytes;
467 }
468
469 return generated;
470 }
471
UpdateSendBitrateEstimate(size_t num_packets)472 void TcpSender::UpdateSendBitrateEstimate(size_t num_packets) {
473 const int kTimeWindowMs = 500;
474 num_recent_sent_packets_ += num_packets;
475
476 int64_t delta_ms = clock_.TimeInMilliseconds() - last_generated_packets_ms_;
477 if (delta_ms >= kTimeWindowMs) {
478 bitrate_kbps_ =
479 static_cast<uint32_t>(8 * num_recent_sent_packets_ * kPacketSizeBytes) /
480 delta_ms;
481 last_generated_packets_ms_ = clock_.TimeInMilliseconds();
482 num_recent_sent_packets_ = 0;
483 }
484
485 RecordBitrate();
486 }
487
TargetBitrateKbps()488 uint32_t TcpSender::TargetBitrateKbps() {
489 return bitrate_kbps_;
490 }
491
492 } // namespace bwe
493 } // namespace testing
494 } // namespace webrtc
495