1 /*
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "call/fake_network_pipe.h"
12
13 #include <string.h>
14
15 #include <algorithm>
16 #include <queue>
17 #include <utility>
18 #include <vector>
19
20 #include "api/media_types.h"
21 #include "modules/utility/include/process_thread.h"
22 #include "rtc_base/checks.h"
23 #include "rtc_base/logging.h"
24 #include "system_wrappers/include/clock.h"
25
26 namespace webrtc {
27
28 namespace {
29 constexpr int64_t kLogIntervalMs = 5000;
30 } // namespace
31
NetworkPacket(rtc::CopyOnWriteBuffer packet,int64_t send_time,int64_t arrival_time,absl::optional<PacketOptions> packet_options,bool is_rtcp,MediaType media_type,absl::optional<int64_t> packet_time_us,Transport * transport)32 NetworkPacket::NetworkPacket(rtc::CopyOnWriteBuffer packet,
33 int64_t send_time,
34 int64_t arrival_time,
35 absl::optional<PacketOptions> packet_options,
36 bool is_rtcp,
37 MediaType media_type,
38 absl::optional<int64_t> packet_time_us,
39 Transport* transport)
40 : packet_(std::move(packet)),
41 send_time_(send_time),
42 arrival_time_(arrival_time),
43 packet_options_(packet_options),
44 is_rtcp_(is_rtcp),
45 media_type_(media_type),
46 packet_time_us_(packet_time_us),
47 transport_(transport) {}
48
NetworkPacket(NetworkPacket && o)49 NetworkPacket::NetworkPacket(NetworkPacket&& o)
50 : packet_(std::move(o.packet_)),
51 send_time_(o.send_time_),
52 arrival_time_(o.arrival_time_),
53 packet_options_(o.packet_options_),
54 is_rtcp_(o.is_rtcp_),
55 media_type_(o.media_type_),
56 packet_time_us_(o.packet_time_us_),
57 transport_(o.transport_) {}
58
59 NetworkPacket::~NetworkPacket() = default;
60
operator =(NetworkPacket && o)61 NetworkPacket& NetworkPacket::operator=(NetworkPacket&& o) {
62 packet_ = std::move(o.packet_);
63 send_time_ = o.send_time_;
64 arrival_time_ = o.arrival_time_;
65 packet_options_ = o.packet_options_;
66 is_rtcp_ = o.is_rtcp_;
67 media_type_ = o.media_type_;
68 packet_time_us_ = o.packet_time_us_;
69 transport_ = o.transport_;
70
71 return *this;
72 }
73
FakeNetworkPipe(Clock * clock,std::unique_ptr<NetworkBehaviorInterface> network_behavior)74 FakeNetworkPipe::FakeNetworkPipe(
75 Clock* clock,
76 std::unique_ptr<NetworkBehaviorInterface> network_behavior)
77 : FakeNetworkPipe(clock, std::move(network_behavior), nullptr, 1) {}
78
FakeNetworkPipe(Clock * clock,std::unique_ptr<NetworkBehaviorInterface> network_behavior,PacketReceiver * receiver)79 FakeNetworkPipe::FakeNetworkPipe(
80 Clock* clock,
81 std::unique_ptr<NetworkBehaviorInterface> network_behavior,
82 PacketReceiver* receiver)
83 : FakeNetworkPipe(clock, std::move(network_behavior), receiver, 1) {}
84
FakeNetworkPipe(Clock * clock,std::unique_ptr<NetworkBehaviorInterface> network_behavior,PacketReceiver * receiver,uint64_t seed)85 FakeNetworkPipe::FakeNetworkPipe(
86 Clock* clock,
87 std::unique_ptr<NetworkBehaviorInterface> network_behavior,
88 PacketReceiver* receiver,
89 uint64_t seed)
90 : clock_(clock),
91 network_behavior_(std::move(network_behavior)),
92 receiver_(receiver),
93 global_transport_(nullptr),
94 clock_offset_ms_(0),
95 dropped_packets_(0),
96 sent_packets_(0),
97 total_packet_delay_us_(0),
98 last_log_time_us_(clock_->TimeInMicroseconds()) {}
99
FakeNetworkPipe(Clock * clock,std::unique_ptr<NetworkBehaviorInterface> network_behavior,Transport * transport)100 FakeNetworkPipe::FakeNetworkPipe(
101 Clock* clock,
102 std::unique_ptr<NetworkBehaviorInterface> network_behavior,
103 Transport* transport)
104 : clock_(clock),
105 network_behavior_(std::move(network_behavior)),
106 receiver_(nullptr),
107 global_transport_(transport),
108 clock_offset_ms_(0),
109 dropped_packets_(0),
110 sent_packets_(0),
111 total_packet_delay_us_(0),
112 last_log_time_us_(clock_->TimeInMicroseconds()) {
113 RTC_DCHECK(global_transport_);
114 AddActiveTransport(global_transport_);
115 }
116
~FakeNetworkPipe()117 FakeNetworkPipe::~FakeNetworkPipe() {
118 if (global_transport_) {
119 RemoveActiveTransport(global_transport_);
120 }
121 RTC_DCHECK(active_transports_.empty());
122 }
123
SetReceiver(PacketReceiver * receiver)124 void FakeNetworkPipe::SetReceiver(PacketReceiver* receiver) {
125 MutexLock lock(&config_lock_);
126 receiver_ = receiver;
127 }
128
AddActiveTransport(Transport * transport)129 void FakeNetworkPipe::AddActiveTransport(Transport* transport) {
130 MutexLock lock(&config_lock_);
131 active_transports_[transport]++;
132 }
133
RemoveActiveTransport(Transport * transport)134 void FakeNetworkPipe::RemoveActiveTransport(Transport* transport) {
135 MutexLock lock(&config_lock_);
136 auto it = active_transports_.find(transport);
137 RTC_CHECK(it != active_transports_.end());
138 if (--(it->second) == 0) {
139 active_transports_.erase(it);
140 }
141 }
142
SendRtp(const uint8_t * packet,size_t length,const PacketOptions & options)143 bool FakeNetworkPipe::SendRtp(const uint8_t* packet,
144 size_t length,
145 const PacketOptions& options) {
146 RTC_DCHECK(global_transport_);
147 EnqueuePacket(rtc::CopyOnWriteBuffer(packet, length), options, false,
148 global_transport_);
149 return true;
150 }
151
SendRtcp(const uint8_t * packet,size_t length)152 bool FakeNetworkPipe::SendRtcp(const uint8_t* packet, size_t length) {
153 RTC_DCHECK(global_transport_);
154 EnqueuePacket(rtc::CopyOnWriteBuffer(packet, length), absl::nullopt, true,
155 global_transport_);
156 return true;
157 }
158
SendRtp(const uint8_t * packet,size_t length,const PacketOptions & options,Transport * transport)159 bool FakeNetworkPipe::SendRtp(const uint8_t* packet,
160 size_t length,
161 const PacketOptions& options,
162 Transport* transport) {
163 RTC_DCHECK(transport);
164 EnqueuePacket(rtc::CopyOnWriteBuffer(packet, length), options, false,
165 transport);
166 return true;
167 }
168
SendRtcp(const uint8_t * packet,size_t length,Transport * transport)169 bool FakeNetworkPipe::SendRtcp(const uint8_t* packet,
170 size_t length,
171 Transport* transport) {
172 RTC_DCHECK(transport);
173 EnqueuePacket(rtc::CopyOnWriteBuffer(packet, length), absl::nullopt, true,
174 transport);
175 return true;
176 }
177
DeliverPacket(MediaType media_type,rtc::CopyOnWriteBuffer packet,int64_t packet_time_us)178 PacketReceiver::DeliveryStatus FakeNetworkPipe::DeliverPacket(
179 MediaType media_type,
180 rtc::CopyOnWriteBuffer packet,
181 int64_t packet_time_us) {
182 return EnqueuePacket(std::move(packet), absl::nullopt, false, media_type,
183 packet_time_us)
184 ? PacketReceiver::DELIVERY_OK
185 : PacketReceiver::DELIVERY_PACKET_ERROR;
186 }
187
SetClockOffset(int64_t offset_ms)188 void FakeNetworkPipe::SetClockOffset(int64_t offset_ms) {
189 MutexLock lock(&config_lock_);
190 clock_offset_ms_ = offset_ms;
191 }
192
StoredPacket(NetworkPacket && packet)193 FakeNetworkPipe::StoredPacket::StoredPacket(NetworkPacket&& packet)
194 : packet(std::move(packet)) {}
195
EnqueuePacket(rtc::CopyOnWriteBuffer packet,absl::optional<PacketOptions> options,bool is_rtcp,MediaType media_type,absl::optional<int64_t> packet_time_us)196 bool FakeNetworkPipe::EnqueuePacket(rtc::CopyOnWriteBuffer packet,
197 absl::optional<PacketOptions> options,
198 bool is_rtcp,
199 MediaType media_type,
200 absl::optional<int64_t> packet_time_us) {
201 MutexLock lock(&process_lock_);
202 int64_t time_now_us = clock_->TimeInMicroseconds();
203 return EnqueuePacket(NetworkPacket(std::move(packet), time_now_us,
204 time_now_us, options, is_rtcp, media_type,
205 packet_time_us, nullptr));
206 }
207
EnqueuePacket(rtc::CopyOnWriteBuffer packet,absl::optional<PacketOptions> options,bool is_rtcp,Transport * transport)208 bool FakeNetworkPipe::EnqueuePacket(rtc::CopyOnWriteBuffer packet,
209 absl::optional<PacketOptions> options,
210 bool is_rtcp,
211 Transport* transport) {
212 MutexLock lock(&process_lock_);
213 int64_t time_now_us = clock_->TimeInMicroseconds();
214 return EnqueuePacket(NetworkPacket(std::move(packet), time_now_us,
215 time_now_us, options, is_rtcp,
216 MediaType::ANY, absl::nullopt, transport));
217 }
218
EnqueuePacket(NetworkPacket && net_packet)219 bool FakeNetworkPipe::EnqueuePacket(NetworkPacket&& net_packet) {
220 int64_t send_time_us = net_packet.send_time();
221 size_t packet_size = net_packet.data_length();
222
223 packets_in_flight_.emplace_back(StoredPacket(std::move(net_packet)));
224 int64_t packet_id = reinterpret_cast<uint64_t>(&packets_in_flight_.back());
225 bool sent = network_behavior_->EnqueuePacket(
226 PacketInFlightInfo(packet_size, send_time_us, packet_id));
227
228 if (!sent) {
229 packets_in_flight_.pop_back();
230 ++dropped_packets_;
231 }
232 return sent;
233 }
234
PercentageLoss()235 float FakeNetworkPipe::PercentageLoss() {
236 MutexLock lock(&process_lock_);
237 if (sent_packets_ == 0)
238 return 0;
239
240 return static_cast<float>(dropped_packets_) /
241 (sent_packets_ + dropped_packets_);
242 }
243
AverageDelay()244 int FakeNetworkPipe::AverageDelay() {
245 MutexLock lock(&process_lock_);
246 if (sent_packets_ == 0)
247 return 0;
248
249 return static_cast<int>(total_packet_delay_us_ /
250 (1000 * static_cast<int64_t>(sent_packets_)));
251 }
252
DroppedPackets()253 size_t FakeNetworkPipe::DroppedPackets() {
254 MutexLock lock(&process_lock_);
255 return dropped_packets_;
256 }
257
SentPackets()258 size_t FakeNetworkPipe::SentPackets() {
259 MutexLock lock(&process_lock_);
260 return sent_packets_;
261 }
262
Process()263 void FakeNetworkPipe::Process() {
264 int64_t time_now_us;
265 std::queue<NetworkPacket> packets_to_deliver;
266 {
267 MutexLock lock(&process_lock_);
268 time_now_us = clock_->TimeInMicroseconds();
269 if (time_now_us - last_log_time_us_ > kLogIntervalMs * 1000) {
270 int64_t queueing_delay_us = 0;
271 if (!packets_in_flight_.empty())
272 queueing_delay_us =
273 time_now_us - packets_in_flight_.front().packet.send_time();
274
275 RTC_LOG(LS_INFO) << "Network queue: " << queueing_delay_us / 1000
276 << " ms.";
277 last_log_time_us_ = time_now_us;
278 }
279
280 std::vector<PacketDeliveryInfo> delivery_infos =
281 network_behavior_->DequeueDeliverablePackets(time_now_us);
282 for (auto& delivery_info : delivery_infos) {
283 // In the common case where no reordering happens, find will return early
284 // as the first packet will be a match.
285 auto packet_it =
286 std::find_if(packets_in_flight_.begin(), packets_in_flight_.end(),
287 [&delivery_info](StoredPacket& packet_ref) {
288 return reinterpret_cast<uint64_t>(&packet_ref) ==
289 delivery_info.packet_id;
290 });
291 // Check that the packet is in the deque of packets in flight.
292 RTC_CHECK(packet_it != packets_in_flight_.end());
293 // Check that the packet is not already removed.
294 RTC_DCHECK(!packet_it->removed);
295
296 NetworkPacket packet = std::move(packet_it->packet);
297 packet_it->removed = true;
298
299 // Cleanup of removed packets at the beginning of the deque.
300 while (!packets_in_flight_.empty() &&
301 packets_in_flight_.front().removed) {
302 packets_in_flight_.pop_front();
303 }
304
305 if (delivery_info.receive_time_us != PacketDeliveryInfo::kNotReceived) {
306 int64_t added_delay_us =
307 delivery_info.receive_time_us - packet.send_time();
308 packet.IncrementArrivalTime(added_delay_us);
309 packets_to_deliver.emplace(std::move(packet));
310 // |time_now_us| might be later than when the packet should have
311 // arrived, due to NetworkProcess being called too late. For stats, use
312 // the time it should have been on the link.
313 total_packet_delay_us_ += added_delay_us;
314 ++sent_packets_;
315 } else {
316 ++dropped_packets_;
317 }
318 }
319 }
320
321 MutexLock lock(&config_lock_);
322 while (!packets_to_deliver.empty()) {
323 NetworkPacket packet = std::move(packets_to_deliver.front());
324 packets_to_deliver.pop();
325 DeliverNetworkPacket(&packet);
326 }
327 }
328
DeliverNetworkPacket(NetworkPacket * packet)329 void FakeNetworkPipe::DeliverNetworkPacket(NetworkPacket* packet) {
330 Transport* transport = packet->transport();
331 if (transport) {
332 RTC_DCHECK(!receiver_);
333 if (active_transports_.find(transport) == active_transports_.end()) {
334 // Transport has been destroyed, ignore this packet.
335 return;
336 }
337 if (packet->is_rtcp()) {
338 transport->SendRtcp(packet->data(), packet->data_length());
339 } else {
340 transport->SendRtp(packet->data(), packet->data_length(),
341 packet->packet_options());
342 }
343 } else if (receiver_) {
344 int64_t packet_time_us = packet->packet_time_us().value_or(-1);
345 if (packet_time_us != -1) {
346 int64_t queue_time_us = packet->arrival_time() - packet->send_time();
347 RTC_CHECK(queue_time_us >= 0);
348 packet_time_us += queue_time_us;
349 packet_time_us += (clock_offset_ms_ * 1000);
350 }
351 receiver_->DeliverPacket(packet->media_type(),
352 std::move(*packet->raw_packet()), packet_time_us);
353 }
354 }
355
TimeUntilNextProcess()356 absl::optional<int64_t> FakeNetworkPipe::TimeUntilNextProcess() {
357 MutexLock lock(&process_lock_);
358 absl::optional<int64_t> delivery_us = network_behavior_->NextDeliveryTimeUs();
359 if (delivery_us) {
360 int64_t delay_us = *delivery_us - clock_->TimeInMicroseconds();
361 return std::max<int64_t>((delay_us + 500) / 1000, 0);
362 }
363 return absl::nullopt;
364 }
365
HasReceiver() const366 bool FakeNetworkPipe::HasReceiver() const {
367 MutexLock lock(&config_lock_);
368 return receiver_ != nullptr;
369 }
370
DeliverPacketWithLock(NetworkPacket * packet)371 void FakeNetworkPipe::DeliverPacketWithLock(NetworkPacket* packet) {
372 MutexLock lock(&config_lock_);
373 DeliverNetworkPacket(packet);
374 }
375
ResetStats()376 void FakeNetworkPipe::ResetStats() {
377 MutexLock lock(&process_lock_);
378 dropped_packets_ = 0;
379 sent_packets_ = 0;
380 total_packet_delay_us_ = 0;
381 }
382
GetTimeInMicroseconds() const383 int64_t FakeNetworkPipe::GetTimeInMicroseconds() const {
384 return clock_->TimeInMicroseconds();
385 }
386
387 } // namespace webrtc
388