1 /*
2 * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "test/network/network_emulation.h"
12
13 #include <algorithm>
14 #include <limits>
15 #include <memory>
16
17 #include "api/units/data_size.h"
18 #include "rtc_base/bind.h"
19 #include "rtc_base/logging.h"
20
21 namespace webrtc {
22
OnPacketReceived(EmulatedIpPacket packet)23 void LinkEmulation::OnPacketReceived(EmulatedIpPacket packet) {
24 task_queue_->PostTask([this, packet = std::move(packet)]() mutable {
25 RTC_DCHECK_RUN_ON(task_queue_);
26
27 uint64_t packet_id = next_packet_id_++;
28 bool sent = network_behavior_->EnqueuePacket(PacketInFlightInfo(
29 packet.ip_packet_size(), packet.arrival_time.us(), packet_id));
30 if (sent) {
31 packets_.emplace_back(StoredPacket{packet_id, std::move(packet), false});
32 }
33 if (process_task_.Running())
34 return;
35 absl::optional<int64_t> next_time_us =
36 network_behavior_->NextDeliveryTimeUs();
37 if (!next_time_us)
38 return;
39 Timestamp current_time = clock_->CurrentTime();
40 process_task_ = RepeatingTaskHandle::DelayedStart(
41 task_queue_->Get(),
42 std::max(TimeDelta::Zero(),
43 Timestamp::Micros(*next_time_us) - current_time),
44 [this]() {
45 RTC_DCHECK_RUN_ON(task_queue_);
46 Timestamp current_time = clock_->CurrentTime();
47 Process(current_time);
48 absl::optional<int64_t> next_time_us =
49 network_behavior_->NextDeliveryTimeUs();
50 if (!next_time_us) {
51 process_task_.Stop();
52 return TimeDelta::Zero(); // This is ignored.
53 }
54 RTC_DCHECK_GE(*next_time_us, current_time.us());
55 return Timestamp::Micros(*next_time_us) - current_time;
56 });
57 });
58 }
59
Process(Timestamp at_time)60 void LinkEmulation::Process(Timestamp at_time) {
61 std::vector<PacketDeliveryInfo> delivery_infos =
62 network_behavior_->DequeueDeliverablePackets(at_time.us());
63 for (PacketDeliveryInfo& delivery_info : delivery_infos) {
64 StoredPacket* packet = nullptr;
65 for (auto& stored_packet : packets_) {
66 if (stored_packet.id == delivery_info.packet_id) {
67 packet = &stored_packet;
68 break;
69 }
70 }
71 RTC_CHECK(packet);
72 RTC_DCHECK(!packet->removed);
73 packet->removed = true;
74
75 if (delivery_info.receive_time_us != PacketDeliveryInfo::kNotReceived) {
76 packet->packet.arrival_time =
77 Timestamp::Micros(delivery_info.receive_time_us);
78 receiver_->OnPacketReceived(std::move(packet->packet));
79 }
80 while (!packets_.empty() && packets_.front().removed) {
81 packets_.pop_front();
82 }
83 }
84 }
85
NetworkRouterNode(rtc::TaskQueue * task_queue)86 NetworkRouterNode::NetworkRouterNode(rtc::TaskQueue* task_queue)
87 : task_queue_(task_queue) {}
88
OnPacketReceived(EmulatedIpPacket packet)89 void NetworkRouterNode::OnPacketReceived(EmulatedIpPacket packet) {
90 RTC_DCHECK_RUN_ON(task_queue_);
91 if (watcher_) {
92 watcher_(packet);
93 }
94 if (filter_) {
95 if (!filter_(packet))
96 return;
97 }
98 auto receiver_it = routing_.find(packet.to.ipaddr());
99 if (receiver_it == routing_.end()) {
100 return;
101 }
102 RTC_CHECK(receiver_it != routing_.end());
103
104 receiver_it->second->OnPacketReceived(std::move(packet));
105 }
106
SetReceiver(const rtc::IPAddress & dest_ip,EmulatedNetworkReceiverInterface * receiver)107 void NetworkRouterNode::SetReceiver(
108 const rtc::IPAddress& dest_ip,
109 EmulatedNetworkReceiverInterface* receiver) {
110 task_queue_->PostTask([=] {
111 RTC_DCHECK_RUN_ON(task_queue_);
112 EmulatedNetworkReceiverInterface* cur_receiver = routing_[dest_ip];
113 RTC_CHECK(cur_receiver == nullptr || cur_receiver == receiver)
114 << "Routing for dest_ip=" << dest_ip.ToString() << " already exists";
115 routing_[dest_ip] = receiver;
116 });
117 }
118
RemoveReceiver(const rtc::IPAddress & dest_ip)119 void NetworkRouterNode::RemoveReceiver(const rtc::IPAddress& dest_ip) {
120 RTC_DCHECK_RUN_ON(task_queue_);
121 routing_.erase(dest_ip);
122 }
123
SetWatcher(std::function<void (const EmulatedIpPacket &)> watcher)124 void NetworkRouterNode::SetWatcher(
125 std::function<void(const EmulatedIpPacket&)> watcher) {
126 task_queue_->PostTask([=] {
127 RTC_DCHECK_RUN_ON(task_queue_);
128 watcher_ = watcher;
129 });
130 }
131
SetFilter(std::function<bool (const EmulatedIpPacket &)> filter)132 void NetworkRouterNode::SetFilter(
133 std::function<bool(const EmulatedIpPacket&)> filter) {
134 task_queue_->PostTask([=] {
135 RTC_DCHECK_RUN_ON(task_queue_);
136 filter_ = filter;
137 });
138 }
139
EmulatedNetworkNode(Clock * clock,rtc::TaskQueue * task_queue,std::unique_ptr<NetworkBehaviorInterface> network_behavior)140 EmulatedNetworkNode::EmulatedNetworkNode(
141 Clock* clock,
142 rtc::TaskQueue* task_queue,
143 std::unique_ptr<NetworkBehaviorInterface> network_behavior)
144 : router_(task_queue),
145 link_(clock, task_queue, std::move(network_behavior), &router_) {}
146
OnPacketReceived(EmulatedIpPacket packet)147 void EmulatedNetworkNode::OnPacketReceived(EmulatedIpPacket packet) {
148 link_.OnPacketReceived(std::move(packet));
149 }
150
CreateRoute(const rtc::IPAddress & receiver_ip,std::vector<EmulatedNetworkNode * > nodes,EmulatedNetworkReceiverInterface * receiver)151 void EmulatedNetworkNode::CreateRoute(
152 const rtc::IPAddress& receiver_ip,
153 std::vector<EmulatedNetworkNode*> nodes,
154 EmulatedNetworkReceiverInterface* receiver) {
155 RTC_CHECK(!nodes.empty());
156 for (size_t i = 0; i + 1 < nodes.size(); ++i)
157 nodes[i]->router()->SetReceiver(receiver_ip, nodes[i + 1]);
158 nodes.back()->router()->SetReceiver(receiver_ip, receiver);
159 }
160
ClearRoute(const rtc::IPAddress & receiver_ip,std::vector<EmulatedNetworkNode * > nodes)161 void EmulatedNetworkNode::ClearRoute(const rtc::IPAddress& receiver_ip,
162 std::vector<EmulatedNetworkNode*> nodes) {
163 for (EmulatedNetworkNode* node : nodes)
164 node->router()->RemoveReceiver(receiver_ip);
165 }
166
167 EmulatedNetworkNode::~EmulatedNetworkNode() = default;
168
EmulatedEndpointImpl(uint64_t id,const rtc::IPAddress & ip,bool is_enabled,rtc::AdapterType type,rtc::TaskQueue * task_queue,Clock * clock)169 EmulatedEndpointImpl::EmulatedEndpointImpl(uint64_t id,
170 const rtc::IPAddress& ip,
171 bool is_enabled,
172 rtc::AdapterType type,
173 rtc::TaskQueue* task_queue,
174 Clock* clock)
175 : id_(id),
176 peer_local_addr_(ip),
177 is_enabled_(is_enabled),
178 type_(type),
179 clock_(clock),
180 task_queue_(task_queue),
181 router_(task_queue_),
182 next_port_(kFirstEphemeralPort) {
183 constexpr int kIPv4NetworkPrefixLength = 24;
184 constexpr int kIPv6NetworkPrefixLength = 64;
185
186 int prefix_length = 0;
187 if (ip.family() == AF_INET) {
188 prefix_length = kIPv4NetworkPrefixLength;
189 } else if (ip.family() == AF_INET6) {
190 prefix_length = kIPv6NetworkPrefixLength;
191 }
192 rtc::IPAddress prefix = TruncateIP(ip, prefix_length);
193 network_ = std::make_unique<rtc::Network>(
194 ip.ToString(), "Endpoint id=" + std::to_string(id_), prefix,
195 prefix_length, type_);
196 network_->AddIP(ip);
197
198 enabled_state_checker_.Detach();
199 stats_.local_addresses.push_back(peer_local_addr_);
200 }
201 EmulatedEndpointImpl::~EmulatedEndpointImpl() = default;
202
GetId() const203 uint64_t EmulatedEndpointImpl::GetId() const {
204 return id_;
205 }
206
SendPacket(const rtc::SocketAddress & from,const rtc::SocketAddress & to,rtc::CopyOnWriteBuffer packet_data,uint16_t application_overhead)207 void EmulatedEndpointImpl::SendPacket(const rtc::SocketAddress& from,
208 const rtc::SocketAddress& to,
209 rtc::CopyOnWriteBuffer packet_data,
210 uint16_t application_overhead) {
211 RTC_CHECK(from.ipaddr() == peer_local_addr_);
212 EmulatedIpPacket packet(from, to, std::move(packet_data),
213 clock_->CurrentTime(), application_overhead);
214 task_queue_->PostTask([this, packet = std::move(packet)]() mutable {
215 RTC_DCHECK_RUN_ON(task_queue_);
216 Timestamp current_time = clock_->CurrentTime();
217 if (stats_.first_packet_sent_time.IsInfinite()) {
218 stats_.first_packet_sent_time = current_time;
219 stats_.first_sent_packet_size = DataSize::Bytes(packet.ip_packet_size());
220 }
221 stats_.last_packet_sent_time = current_time;
222 stats_.packets_sent++;
223 stats_.bytes_sent += DataSize::Bytes(packet.ip_packet_size());
224
225 router_.OnPacketReceived(std::move(packet));
226 });
227 }
228
BindReceiver(uint16_t desired_port,EmulatedNetworkReceiverInterface * receiver)229 absl::optional<uint16_t> EmulatedEndpointImpl::BindReceiver(
230 uint16_t desired_port,
231 EmulatedNetworkReceiverInterface* receiver) {
232 rtc::CritScope crit(&receiver_lock_);
233 uint16_t port = desired_port;
234 if (port == 0) {
235 // Because client can specify its own port, next_port_ can be already in
236 // use, so we need to find next available port.
237 int ports_pool_size =
238 std::numeric_limits<uint16_t>::max() - kFirstEphemeralPort + 1;
239 for (int i = 0; i < ports_pool_size; ++i) {
240 uint16_t next_port = NextPort();
241 if (port_to_receiver_.find(next_port) == port_to_receiver_.end()) {
242 port = next_port;
243 break;
244 }
245 }
246 }
247 RTC_CHECK(port != 0) << "Can't find free port for receiver in endpoint "
248 << id_;
249 bool result = port_to_receiver_.insert({port, receiver}).second;
250 if (!result) {
251 RTC_LOG(INFO) << "Can't bind receiver to used port " << desired_port
252 << " in endpoint " << id_;
253 return absl::nullopt;
254 }
255 RTC_LOG(INFO) << "New receiver is binded to endpoint " << id_ << " on port "
256 << port;
257 return port;
258 }
259
NextPort()260 uint16_t EmulatedEndpointImpl::NextPort() {
261 uint16_t out = next_port_;
262 if (next_port_ == std::numeric_limits<uint16_t>::max()) {
263 next_port_ = kFirstEphemeralPort;
264 } else {
265 next_port_++;
266 }
267 return out;
268 }
269
UnbindReceiver(uint16_t port)270 void EmulatedEndpointImpl::UnbindReceiver(uint16_t port) {
271 rtc::CritScope crit(&receiver_lock_);
272 port_to_receiver_.erase(port);
273 }
274
GetPeerLocalAddress() const275 rtc::IPAddress EmulatedEndpointImpl::GetPeerLocalAddress() const {
276 return peer_local_addr_;
277 }
278
OnPacketReceived(EmulatedIpPacket packet)279 void EmulatedEndpointImpl::OnPacketReceived(EmulatedIpPacket packet) {
280 RTC_DCHECK_RUN_ON(task_queue_);
281 RTC_CHECK(packet.to.ipaddr() == peer_local_addr_)
282 << "Routing error: wrong destination endpoint. Packet.to.ipaddr()=: "
283 << packet.to.ipaddr().ToString()
284 << "; Receiver peer_local_addr_=" << peer_local_addr_.ToString();
285 rtc::CritScope crit(&receiver_lock_);
286 UpdateReceiveStats(packet);
287 auto it = port_to_receiver_.find(packet.to.port());
288 if (it == port_to_receiver_.end()) {
289 // It can happen, that remote peer closed connection, but there still some
290 // packets, that are going to it. It can happen during peer connection close
291 // process: one peer closed connection, second still sending data.
292 RTC_LOG(INFO) << "Drop packet: no receiver registered in " << id_
293 << " on port " << packet.to.port();
294 stats_.incoming_stats_per_source[packet.from.ipaddr()].packets_dropped++;
295 stats_.incoming_stats_per_source[packet.from.ipaddr()].bytes_dropped +=
296 DataSize::Bytes(packet.ip_packet_size());
297 return;
298 }
299 // Endpoint assumes frequent calls to bind and unbind methods, so it holds
300 // lock during packet processing to ensure that receiver won't be deleted
301 // before call to OnPacketReceived.
302 it->second->OnPacketReceived(std::move(packet));
303 }
304
Enable()305 void EmulatedEndpointImpl::Enable() {
306 RTC_DCHECK_RUN_ON(&enabled_state_checker_);
307 RTC_CHECK(!is_enabled_);
308 is_enabled_ = true;
309 }
310
Disable()311 void EmulatedEndpointImpl::Disable() {
312 RTC_DCHECK_RUN_ON(&enabled_state_checker_);
313 RTC_CHECK(is_enabled_);
314 is_enabled_ = false;
315 }
316
Enabled() const317 bool EmulatedEndpointImpl::Enabled() const {
318 RTC_DCHECK_RUN_ON(&enabled_state_checker_);
319 return is_enabled_;
320 }
321
stats()322 EmulatedNetworkStats EmulatedEndpointImpl::stats() {
323 RTC_DCHECK_RUN_ON(task_queue_);
324 return stats_;
325 }
326
UpdateReceiveStats(const EmulatedIpPacket & packet)327 void EmulatedEndpointImpl::UpdateReceiveStats(const EmulatedIpPacket& packet) {
328 RTC_DCHECK_RUN_ON(task_queue_);
329 Timestamp current_time = clock_->CurrentTime();
330 if (stats_.incoming_stats_per_source[packet.from.ipaddr()]
331 .first_packet_received_time.IsInfinite()) {
332 stats_.incoming_stats_per_source[packet.from.ipaddr()]
333 .first_packet_received_time = current_time;
334 stats_.incoming_stats_per_source[packet.from.ipaddr()]
335 .first_received_packet_size = DataSize::Bytes(packet.ip_packet_size());
336 }
337 stats_.incoming_stats_per_source[packet.from.ipaddr()]
338 .last_packet_received_time = current_time;
339 stats_.incoming_stats_per_source[packet.from.ipaddr()].packets_received++;
340 stats_.incoming_stats_per_source[packet.from.ipaddr()].bytes_received +=
341 DataSize::Bytes(packet.ip_packet_size());
342 }
343
EndpointsContainer(const std::vector<EmulatedEndpointImpl * > & endpoints)344 EndpointsContainer::EndpointsContainer(
345 const std::vector<EmulatedEndpointImpl*>& endpoints)
346 : endpoints_(endpoints) {}
347
LookupByLocalAddress(const rtc::IPAddress & local_ip) const348 EmulatedEndpointImpl* EndpointsContainer::LookupByLocalAddress(
349 const rtc::IPAddress& local_ip) const {
350 for (auto* endpoint : endpoints_) {
351 rtc::IPAddress peer_local_address = endpoint->GetPeerLocalAddress();
352 if (peer_local_address == local_ip) {
353 return endpoint;
354 }
355 }
356 RTC_CHECK(false) << "No network found for address" << local_ip.ToString();
357 }
358
HasEndpoint(EmulatedEndpointImpl * endpoint) const359 bool EndpointsContainer::HasEndpoint(EmulatedEndpointImpl* endpoint) const {
360 for (auto* e : endpoints_) {
361 if (e->GetId() == endpoint->GetId()) {
362 return true;
363 }
364 }
365 return false;
366 }
367
368 std::vector<std::unique_ptr<rtc::Network>>
GetEnabledNetworks() const369 EndpointsContainer::GetEnabledNetworks() const {
370 std::vector<std::unique_ptr<rtc::Network>> networks;
371 for (auto* endpoint : endpoints_) {
372 if (endpoint->Enabled()) {
373 networks.emplace_back(
374 std::make_unique<rtc::Network>(endpoint->network()));
375 }
376 }
377 return networks;
378 }
379
GetStats() const380 EmulatedNetworkStats EndpointsContainer::GetStats() const {
381 EmulatedNetworkStats stats;
382 for (auto* endpoint : endpoints_) {
383 EmulatedNetworkStats endpoint_stats = endpoint->stats();
384 stats.packets_sent += endpoint_stats.packets_sent;
385 stats.bytes_sent += endpoint_stats.bytes_sent;
386 if (stats.first_packet_sent_time > endpoint_stats.first_packet_sent_time) {
387 stats.first_packet_sent_time = endpoint_stats.first_packet_sent_time;
388 stats.first_sent_packet_size = endpoint_stats.first_sent_packet_size;
389 }
390 if (stats.last_packet_sent_time < endpoint_stats.last_packet_sent_time) {
391 stats.last_packet_sent_time = endpoint_stats.last_packet_sent_time;
392 }
393 for (const rtc::IPAddress& addr : endpoint_stats.local_addresses) {
394 stats.local_addresses.push_back(addr);
395 }
396 for (auto& entry : endpoint_stats.incoming_stats_per_source) {
397 const EmulatedNetworkIncomingStats& source = entry.second;
398 EmulatedNetworkIncomingStats& in_stats =
399 stats.incoming_stats_per_source[entry.first];
400 in_stats.packets_received += source.packets_received;
401 in_stats.bytes_received += source.bytes_received;
402 in_stats.packets_dropped += source.packets_dropped;
403 in_stats.bytes_dropped += source.bytes_dropped;
404 if (in_stats.first_packet_received_time >
405 source.first_packet_received_time) {
406 in_stats.first_packet_received_time = source.first_packet_received_time;
407 in_stats.first_received_packet_size = source.first_received_packet_size;
408 }
409 if (in_stats.last_packet_received_time <
410 source.last_packet_received_time) {
411 in_stats.last_packet_received_time = source.last_packet_received_time;
412 }
413 }
414 }
415 return stats;
416 }
417
418 } // namespace webrtc
419