1 /*
2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "test/network/cross_traffic.h"
12
13 #include <math.h>
14
15 #include <utility>
16
17 #include "absl/memory/memory.h"
18 #include "absl/types/optional.h"
19 #include "cross_traffic.h"
20 #include "rtc_base/logging.h"
21 #include "rtc_base/numerics/safe_minmax.h"
22
23 namespace webrtc {
24 namespace test {
25
RandomWalkCrossTraffic(RandomWalkConfig config,TrafficRoute * traffic_route)26 RandomWalkCrossTraffic::RandomWalkCrossTraffic(RandomWalkConfig config,
27 TrafficRoute* traffic_route)
28 : config_(config),
29 traffic_route_(traffic_route),
30 random_(config_.random_seed) {
31 sequence_checker_.Detach();
32 }
33 RandomWalkCrossTraffic::~RandomWalkCrossTraffic() = default;
34
Process(Timestamp at_time)35 void RandomWalkCrossTraffic::Process(Timestamp at_time) {
36 RTC_DCHECK_RUN_ON(&sequence_checker_);
37 if (last_process_time_.IsMinusInfinity()) {
38 last_process_time_ = at_time;
39 }
40 TimeDelta delta = at_time - last_process_time_;
41 last_process_time_ = at_time;
42
43 if (at_time - last_update_time_ >= config_.update_interval) {
44 intensity_ += random_.Gaussian(config_.bias, config_.variance) *
45 sqrt((at_time - last_update_time_).seconds<double>());
46 intensity_ = rtc::SafeClamp(intensity_, 0.0, 1.0);
47 last_update_time_ = at_time;
48 }
49 pending_size_ += TrafficRate() * delta;
50
51 if (pending_size_ >= config_.min_packet_size &&
52 at_time >= last_send_time_ + config_.min_packet_interval) {
53 traffic_route_->SendPacket(pending_size_.bytes());
54 pending_size_ = DataSize::Zero();
55 last_send_time_ = at_time;
56 }
57 }
58
TrafficRate() const59 DataRate RandomWalkCrossTraffic::TrafficRate() const {
60 RTC_DCHECK_RUN_ON(&sequence_checker_);
61 return config_.peak_rate * intensity_;
62 }
63
StatsPrinter()64 ColumnPrinter RandomWalkCrossTraffic::StatsPrinter() {
65 return ColumnPrinter::Lambda(
66 "random_walk_cross_traffic_rate",
67 [this](rtc::SimpleStringBuilder& sb) {
68 sb.AppendFormat("%.0lf", TrafficRate().bps() / 8.0);
69 },
70 32);
71 }
72
PulsedPeaksCrossTraffic(PulsedPeaksConfig config,TrafficRoute * traffic_route)73 PulsedPeaksCrossTraffic::PulsedPeaksCrossTraffic(PulsedPeaksConfig config,
74 TrafficRoute* traffic_route)
75 : config_(config), traffic_route_(traffic_route) {
76 sequence_checker_.Detach();
77 }
78 PulsedPeaksCrossTraffic::~PulsedPeaksCrossTraffic() = default;
79
Process(Timestamp at_time)80 void PulsedPeaksCrossTraffic::Process(Timestamp at_time) {
81 RTC_DCHECK_RUN_ON(&sequence_checker_);
82 TimeDelta time_since_toggle = at_time - last_update_time_;
83 if (time_since_toggle.IsInfinite() ||
84 (sending_ && time_since_toggle >= config_.send_duration)) {
85 sending_ = false;
86 last_update_time_ = at_time;
87 } else if (!sending_ && time_since_toggle >= config_.hold_duration) {
88 sending_ = true;
89 last_update_time_ = at_time;
90 // Start sending period.
91 last_send_time_ = at_time;
92 }
93
94 if (sending_) {
95 DataSize pending_size = config_.peak_rate * (at_time - last_send_time_);
96
97 if (pending_size >= config_.min_packet_size &&
98 at_time >= last_send_time_ + config_.min_packet_interval) {
99 traffic_route_->SendPacket(pending_size.bytes());
100 last_send_time_ = at_time;
101 }
102 }
103 }
104
TrafficRate() const105 DataRate PulsedPeaksCrossTraffic::TrafficRate() const {
106 RTC_DCHECK_RUN_ON(&sequence_checker_);
107 return sending_ ? config_.peak_rate : DataRate::Zero();
108 }
109
StatsPrinter()110 ColumnPrinter PulsedPeaksCrossTraffic::StatsPrinter() {
111 return ColumnPrinter::Lambda(
112 "pulsed_peaks_cross_traffic_rate",
113 [this](rtc::SimpleStringBuilder& sb) {
114 sb.AppendFormat("%.0lf", TrafficRate().bps() / 8.0);
115 },
116 32);
117 }
118
TcpMessageRouteImpl(Clock * clock,TaskQueueBase * task_queue,EmulatedRoute * send_route,EmulatedRoute * ret_route)119 TcpMessageRouteImpl::TcpMessageRouteImpl(Clock* clock,
120 TaskQueueBase* task_queue,
121 EmulatedRoute* send_route,
122 EmulatedRoute* ret_route)
123 : clock_(clock),
124 task_queue_(task_queue),
125 request_route_(send_route,
126 [this](TcpPacket packet, Timestamp) {
127 OnRequest(std::move(packet));
128 }),
129 response_route_(ret_route,
__anon4392d0b20402(TcpPacket packet, Timestamp arrival_time) 130 [this](TcpPacket packet, Timestamp arrival_time) {
131 OnResponse(std::move(packet), arrival_time);
132 }) {}
133
SendMessage(size_t size,std::function<void ()> on_received)134 void TcpMessageRouteImpl::SendMessage(size_t size,
135 std::function<void()> on_received) {
136 task_queue_->PostTask(
137 ToQueuedTask([this, size, handler = std::move(on_received)] {
138 // If we are currently sending a message we won't reset the connection,
139 // we'll act as if the messages are sent in the same TCP stream. This is
140 // intended to simulate recreation of a TCP session for each message
141 // in the typical case while avoiding the complexity overhead of
142 // maintaining multiple virtual TCP sessions in parallel.
143 if (pending_.empty() && in_flight_.empty()) {
144 cwnd_ = 10;
145 ssthresh_ = INFINITY;
146 }
147 int64_t data_left = static_cast<int64_t>(size);
148 int64_t kMaxPacketSize = 1200;
149 int64_t kMinPacketSize = 4;
150 Message message{std::move(handler)};
151 while (data_left > 0) {
152 int64_t packet_size = std::min(data_left, kMaxPacketSize);
153 int fragment_id = next_fragment_id_++;
154 pending_.push_back(MessageFragment{
155 fragment_id,
156 static_cast<size_t>(std::max(kMinPacketSize, packet_size))});
157 message.pending_fragment_ids.insert(fragment_id);
158 data_left -= packet_size;
159 }
160 messages_.emplace_back(message);
161 SendPackets(clock_->CurrentTime());
162 }));
163 }
164
OnRequest(TcpPacket packet_info)165 void TcpMessageRouteImpl::OnRequest(TcpPacket packet_info) {
166 for (auto it = messages_.begin(); it != messages_.end(); ++it) {
167 if (it->pending_fragment_ids.count(packet_info.fragment.fragment_id) != 0) {
168 it->pending_fragment_ids.erase(packet_info.fragment.fragment_id);
169 if (it->pending_fragment_ids.empty()) {
170 it->handler();
171 messages_.erase(it);
172 }
173 break;
174 }
175 }
176 const size_t kAckPacketSize = 20;
177 response_route_.SendPacket(kAckPacketSize, packet_info);
178 }
179
OnResponse(TcpPacket packet_info,Timestamp at_time)180 void TcpMessageRouteImpl::OnResponse(TcpPacket packet_info, Timestamp at_time) {
181 auto it = in_flight_.find(packet_info.sequence_number);
182 if (it != in_flight_.end()) {
183 last_rtt_ = at_time - packet_info.send_time;
184 in_flight_.erase(it);
185 }
186 auto lost_end = in_flight_.lower_bound(packet_info.sequence_number);
187 for (auto lost_it = in_flight_.begin(); lost_it != lost_end;
188 lost_it = in_flight_.erase(lost_it)) {
189 pending_.push_front(lost_it->second.fragment);
190 }
191
192 if (packet_info.sequence_number - last_acked_seq_num_ > 1) {
193 HandleLoss(at_time);
194 } else if (cwnd_ <= ssthresh_) {
195 cwnd_ += 1;
196 } else {
197 cwnd_ += 1.0f / cwnd_;
198 }
199 last_acked_seq_num_ =
200 std::max(packet_info.sequence_number, last_acked_seq_num_);
201 SendPackets(at_time);
202 }
203
HandleLoss(Timestamp at_time)204 void TcpMessageRouteImpl::HandleLoss(Timestamp at_time) {
205 if (at_time - last_reduction_time_ < last_rtt_)
206 return;
207 last_reduction_time_ = at_time;
208 ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
209 cwnd_ = ssthresh_;
210 }
211
SendPackets(Timestamp at_time)212 void TcpMessageRouteImpl::SendPackets(Timestamp at_time) {
213 const TimeDelta kPacketTimeout = TimeDelta::Seconds(1);
214 int cwnd = std::ceil(cwnd_);
215 int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
216 while (packets_to_send-- > 0 && !pending_.empty()) {
217 auto seq_num = next_sequence_number_++;
218 TcpPacket send;
219 send.sequence_number = seq_num;
220 send.send_time = at_time;
221 send.fragment = pending_.front();
222 pending_.pop_front();
223 request_route_.SendPacket(send.fragment.size, send);
224 in_flight_.insert({seq_num, send});
225 task_queue_->PostDelayedTask(ToQueuedTask([this, seq_num] {
226 HandlePacketTimeout(seq_num,
227 clock_->CurrentTime());
228 }),
229 kPacketTimeout.ms());
230 }
231 }
232
HandlePacketTimeout(int seq_num,Timestamp at_time)233 void TcpMessageRouteImpl::HandlePacketTimeout(int seq_num, Timestamp at_time) {
234 auto lost = in_flight_.find(seq_num);
235 if (lost != in_flight_.end()) {
236 pending_.push_front(lost->second.fragment);
237 in_flight_.erase(lost);
238 HandleLoss(at_time);
239 SendPackets(at_time);
240 }
241 }
242
FakeTcpCrossTraffic(Clock * clock,FakeTcpConfig config,EmulatedRoute * send_route,EmulatedRoute * ret_route)243 FakeTcpCrossTraffic::FakeTcpCrossTraffic(Clock* clock,
244 FakeTcpConfig config,
245 EmulatedRoute* send_route,
246 EmulatedRoute* ret_route)
247 : clock_(clock), conf_(config), route_(this, send_route, ret_route) {}
248
Start(TaskQueueBase * task_queue)249 void FakeTcpCrossTraffic::Start(TaskQueueBase* task_queue) {
250 repeating_task_handle_ = RepeatingTaskHandle::Start(task_queue, [this] {
251 Process(clock_->CurrentTime());
252 return conf_.process_interval;
253 });
254 }
255
Stop()256 void FakeTcpCrossTraffic::Stop() {
257 repeating_task_handle_.Stop();
258 }
259
Process(Timestamp at_time)260 void FakeTcpCrossTraffic::Process(Timestamp at_time) {
261 SendPackets(at_time);
262 }
263
OnRequest(int sequence_number,Timestamp at_time)264 void FakeTcpCrossTraffic::OnRequest(int sequence_number, Timestamp at_time) {
265 const size_t kAckPacketSize = 20;
266 route_.SendResponse(kAckPacketSize, sequence_number);
267 }
268
OnResponse(int sequence_number,Timestamp at_time)269 void FakeTcpCrossTraffic::OnResponse(int sequence_number, Timestamp at_time) {
270 ack_received_ = true;
271 auto it = in_flight_.find(sequence_number);
272 if (it != in_flight_.end()) {
273 last_rtt_ = at_time - in_flight_.at(sequence_number);
274 in_flight_.erase(sequence_number);
275 }
276 if (sequence_number - last_acked_seq_num_ > 1) {
277 HandleLoss(at_time);
278 } else if (cwnd_ <= ssthresh_) {
279 cwnd_ += 1;
280 } else {
281 cwnd_ += 1.0f / cwnd_;
282 }
283 last_acked_seq_num_ = std::max(sequence_number, last_acked_seq_num_);
284 SendPackets(at_time);
285 }
286
HandleLoss(Timestamp at_time)287 void FakeTcpCrossTraffic::HandleLoss(Timestamp at_time) {
288 if (at_time - last_reduction_time_ < last_rtt_)
289 return;
290 last_reduction_time_ = at_time;
291 ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
292 cwnd_ = ssthresh_;
293 }
294
SendPackets(Timestamp at_time)295 void FakeTcpCrossTraffic::SendPackets(Timestamp at_time) {
296 int cwnd = std::ceil(cwnd_);
297 int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
298 bool timeouts = false;
299 for (auto it = in_flight_.begin(); it != in_flight_.end();) {
300 if (it->second < at_time - conf_.packet_timeout) {
301 it = in_flight_.erase(it);
302 timeouts = true;
303 } else {
304 ++it;
305 }
306 }
307 if (timeouts)
308 HandleLoss(at_time);
309 for (int i = 0; i < packets_to_send; ++i) {
310 if ((total_sent_ + conf_.packet_size) > conf_.send_limit) {
311 break;
312 }
313 in_flight_.insert({next_sequence_number_, at_time});
314 route_.SendRequest(conf_.packet_size.bytes<size_t>(),
315 next_sequence_number_++);
316 total_sent_ += conf_.packet_size;
317 }
318 }
319
320 } // namespace test
321 } // namespace webrtc
322