1 /*
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/modules/video_coding/main/test/rtp_player.h"
12
13 #include <stdio.h>
14
15 #include <map>
16
17 #include "webrtc/modules/rtp_rtcp/interface/rtp_header_parser.h"
18 #include "webrtc/modules/rtp_rtcp/interface/rtp_payload_registry.h"
19 #include "webrtc/modules/rtp_rtcp/interface/rtp_receiver.h"
20 #include "webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h"
21 #include "webrtc/modules/video_coding/main/source/internal_defines.h"
22 #include "webrtc/modules/video_coding/main/test/pcap_file_reader.h"
23 #include "webrtc/modules/video_coding/main/test/rtp_file_reader.h"
24 #include "webrtc/modules/video_coding/main/test/test_util.h"
25 #include "webrtc/system_wrappers/interface/clock.h"
26 #include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
27 #include "webrtc/system_wrappers/interface/scoped_ptr.h"
28
29 #if 1
30 # define DEBUG_LOG1(text, arg)
31 #else
32 # define DEBUG_LOG1(text, arg) (printf(text "\n", arg))
33 #endif
34
35 namespace webrtc {
36 namespace rtpplayer {
37
38 enum {
39 kMaxPacketBufferSize = 4096,
40 kDefaultTransmissionTimeOffsetExtensionId = 2
41 };
42
43 class RawRtpPacket {
44 public:
RawRtpPacket(const uint8_t * data,uint32_t length,uint32_t ssrc,uint16_t seq_num)45 RawRtpPacket(const uint8_t* data, uint32_t length, uint32_t ssrc,
46 uint16_t seq_num)
47 : data_(new uint8_t[length]),
48 length_(length),
49 resend_time_ms_(-1),
50 ssrc_(ssrc),
51 seq_num_(seq_num) {
52 assert(data);
53 memcpy(data_.get(), data, length_);
54 }
55
data() const56 const uint8_t* data() const { return data_.get(); }
length() const57 uint32_t length() const { return length_; }
resend_time_ms() const58 int64_t resend_time_ms() const { return resend_time_ms_; }
set_resend_time_ms(int64_t timeMs)59 void set_resend_time_ms(int64_t timeMs) { resend_time_ms_ = timeMs; }
ssrc() const60 uint32_t ssrc() const { return ssrc_; }
seq_num() const61 uint16_t seq_num() const { return seq_num_; }
62
63 private:
64 scoped_ptr<uint8_t[]> data_;
65 uint32_t length_;
66 int64_t resend_time_ms_;
67 uint32_t ssrc_;
68 uint16_t seq_num_;
69
70 DISALLOW_IMPLICIT_CONSTRUCTORS(RawRtpPacket);
71 };
72
73 class LostPackets {
74 public:
LostPackets(Clock * clock,uint32_t rtt_ms)75 LostPackets(Clock* clock, uint32_t rtt_ms)
76 : crit_sect_(CriticalSectionWrapper::CreateCriticalSection()),
77 debug_file_(fopen("PacketLossDebug.txt", "w")),
78 loss_count_(0),
79 packets_(),
80 clock_(clock),
81 rtt_ms_(rtt_ms) {
82 assert(clock);
83 }
84
~LostPackets()85 ~LostPackets() {
86 if (debug_file_) {
87 fclose(debug_file_);
88 debug_file_ = NULL;
89 }
90 while (!packets_.empty()) {
91 delete packets_.back();
92 packets_.pop_back();
93 }
94 }
95
AddPacket(RawRtpPacket * packet)96 void AddPacket(RawRtpPacket* packet) {
97 assert(packet);
98 printf("Throw: %08x:%u\n", packet->ssrc(), packet->seq_num());
99 CriticalSectionScoped cs(crit_sect_.get());
100 if (debug_file_) {
101 fprintf(debug_file_, "%u Lost packet: %u\n", loss_count_,
102 packet->seq_num());
103 }
104 packets_.push_back(packet);
105 loss_count_++;
106 }
107
SetResendTime(uint32_t ssrc,int16_t resendSeqNum)108 void SetResendTime(uint32_t ssrc, int16_t resendSeqNum) {
109 int64_t resend_time_ms = clock_->TimeInMilliseconds() + rtt_ms_;
110 int64_t now_ms = clock_->TimeInMilliseconds();
111 CriticalSectionScoped cs(crit_sect_.get());
112 for (RtpPacketIterator it = packets_.begin(); it != packets_.end(); ++it) {
113 RawRtpPacket* packet = *it;
114 if (ssrc == packet->ssrc() && resendSeqNum == packet->seq_num() &&
115 packet->resend_time_ms() + 10 < now_ms) {
116 if (debug_file_) {
117 fprintf(debug_file_, "Resend %u at %u\n", packet->seq_num(),
118 MaskWord64ToUWord32(resend_time_ms));
119 }
120 packet->set_resend_time_ms(resend_time_ms);
121 return;
122 }
123 }
124 // We may get here since the captured stream may itself be missing packets.
125 }
126
NextPacketToResend(int64_t time_now)127 RawRtpPacket* NextPacketToResend(int64_t time_now) {
128 CriticalSectionScoped cs(crit_sect_.get());
129 for (RtpPacketIterator it = packets_.begin(); it != packets_.end(); ++it) {
130 RawRtpPacket* packet = *it;
131 if (time_now >= packet->resend_time_ms() &&
132 packet->resend_time_ms() != -1) {
133 packets_.erase(it);
134 return packet;
135 }
136 }
137 return NULL;
138 }
139
NumberOfPacketsToResend() const140 int NumberOfPacketsToResend() const {
141 CriticalSectionScoped cs(crit_sect_.get());
142 int count = 0;
143 for (ConstRtpPacketIterator it = packets_.begin(); it != packets_.end();
144 ++it) {
145 if ((*it)->resend_time_ms() >= 0) {
146 count++;
147 }
148 }
149 return count;
150 }
151
LogPacketResent(RawRtpPacket * packet)152 void LogPacketResent(RawRtpPacket* packet) {
153 int64_t now_ms = clock_->TimeInMilliseconds();
154 CriticalSectionScoped cs(crit_sect_.get());
155 if (debug_file_) {
156 fprintf(debug_file_, "Resent %u at %u\n", packet->seq_num(),
157 MaskWord64ToUWord32(now_ms));
158 }
159 }
160
Print() const161 void Print() const {
162 CriticalSectionScoped cs(crit_sect_.get());
163 printf("Lost packets: %u\n", loss_count_);
164 printf("Packets waiting to be resent: %d\n", NumberOfPacketsToResend());
165 printf("Packets still lost: %zd\n", packets_.size());
166 printf("Sequence numbers:\n");
167 for (ConstRtpPacketIterator it = packets_.begin(); it != packets_.end();
168 ++it) {
169 printf("%u, ", (*it)->seq_num());
170 }
171 printf("\n");
172 }
173
174 private:
175 typedef std::vector<RawRtpPacket*> RtpPacketList;
176 typedef RtpPacketList::iterator RtpPacketIterator;
177 typedef RtpPacketList::const_iterator ConstRtpPacketIterator;
178
179 scoped_ptr<CriticalSectionWrapper> crit_sect_;
180 FILE* debug_file_;
181 int loss_count_;
182 RtpPacketList packets_;
183 Clock* clock_;
184 uint32_t rtt_ms_;
185
186 DISALLOW_IMPLICIT_CONSTRUCTORS(LostPackets);
187 };
188
189 class SsrcHandlers {
190 public:
SsrcHandlers(PayloadSinkFactoryInterface * payload_sink_factory,const PayloadTypes & payload_types)191 SsrcHandlers(PayloadSinkFactoryInterface* payload_sink_factory,
192 const PayloadTypes& payload_types)
193 : payload_sink_factory_(payload_sink_factory),
194 payload_types_(payload_types),
195 handlers_() {
196 assert(payload_sink_factory);
197 }
198
~SsrcHandlers()199 ~SsrcHandlers() {
200 while (!handlers_.empty()) {
201 delete handlers_.begin()->second;
202 handlers_.erase(handlers_.begin());
203 }
204 }
205
RegisterSsrc(uint32_t ssrc,LostPackets * lost_packets,Clock * clock)206 int RegisterSsrc(uint32_t ssrc, LostPackets* lost_packets, Clock* clock) {
207 if (handlers_.count(ssrc) > 0) {
208 return 0;
209 }
210 DEBUG_LOG1("Registering handler for ssrc=%08x", ssrc);
211
212 scoped_ptr<Handler> handler(
213 new Handler(ssrc, payload_types_, lost_packets));
214 handler->payload_sink_.reset(payload_sink_factory_->Create(handler.get()));
215 if (handler->payload_sink_.get() == NULL) {
216 return -1;
217 }
218
219 RtpRtcp::Configuration configuration;
220 configuration.clock = clock;
221 configuration.id = 1;
222 configuration.audio = false;
223 handler->rtp_module_.reset(RtpReceiver::CreateVideoReceiver(
224 configuration.id, configuration.clock, handler->payload_sink_.get(),
225 NULL, handler->rtp_payload_registry_.get()));
226 if (handler->rtp_module_.get() == NULL) {
227 return -1;
228 }
229
230 handler->rtp_module_->SetNACKStatus(kNackOff);
231 handler->rtp_header_parser_->RegisterRtpHeaderExtension(
232 kRtpExtensionTransmissionTimeOffset,
233 kDefaultTransmissionTimeOffsetExtensionId);
234
235 for (PayloadTypesIterator it = payload_types_.begin();
236 it != payload_types_.end(); ++it) {
237 VideoCodec codec;
238 memset(&codec, 0, sizeof(codec));
239 strncpy(codec.plName, it->name().c_str(), sizeof(codec.plName)-1);
240 codec.plType = it->payload_type();
241 codec.codecType = it->codec_type();
242 if (handler->rtp_module_->RegisterReceivePayload(codec.plName,
243 codec.plType,
244 90000,
245 0,
246 codec.maxBitrate) < 0) {
247 return -1;
248 }
249 }
250
251 handlers_[ssrc] = handler.release();
252 return 0;
253 }
254
IncomingPacket(const uint8_t * data,uint32_t length)255 void IncomingPacket(const uint8_t* data, uint32_t length) {
256 for (HandlerMapIt it = handlers_.begin(); it != handlers_.end(); ++it) {
257 if (!it->second->rtp_header_parser_->IsRtcp(data, length)) {
258 RTPHeader header;
259 it->second->rtp_header_parser_->Parse(data, length, &header);
260 PayloadUnion payload_specific;
261 it->second->rtp_payload_registry_->GetPayloadSpecifics(
262 header.payloadType, &payload_specific);
263 it->second->rtp_module_->IncomingRtpPacket(header, data, length,
264 payload_specific, true);
265 }
266 }
267 }
268
269 private:
270 class Handler : public RtpStreamInterface {
271 public:
Handler(uint32_t ssrc,const PayloadTypes & payload_types,LostPackets * lost_packets)272 Handler(uint32_t ssrc, const PayloadTypes& payload_types,
273 LostPackets* lost_packets)
274 : rtp_header_parser_(RtpHeaderParser::Create()),
275 rtp_payload_registry_(new RTPPayloadRegistry(
276 RTPPayloadStrategy::CreateStrategy(false))),
277 rtp_module_(),
278 payload_sink_(),
279 ssrc_(ssrc),
280 payload_types_(payload_types),
281 lost_packets_(lost_packets) {
282 assert(lost_packets);
283 }
~Handler()284 virtual ~Handler() {}
285
ResendPackets(const uint16_t * sequence_numbers,uint16_t length)286 virtual void ResendPackets(const uint16_t* sequence_numbers,
287 uint16_t length) {
288 assert(sequence_numbers);
289 for (uint16_t i = 0; i < length; i++) {
290 lost_packets_->SetResendTime(ssrc_, sequence_numbers[i]);
291 }
292 }
293
ssrc() const294 virtual uint32_t ssrc() const { return ssrc_; }
payload_types() const295 virtual const PayloadTypes& payload_types() const {
296 return payload_types_;
297 }
298
299 scoped_ptr<RtpHeaderParser> rtp_header_parser_;
300 scoped_ptr<RTPPayloadRegistry> rtp_payload_registry_;
301 scoped_ptr<RtpReceiver> rtp_module_;
302 scoped_ptr<PayloadSinkInterface> payload_sink_;
303
304 private:
305 uint32_t ssrc_;
306 const PayloadTypes& payload_types_;
307 LostPackets* lost_packets_;
308
309 DISALLOW_COPY_AND_ASSIGN(Handler);
310 };
311
312 typedef std::map<uint32_t, Handler*> HandlerMap;
313 typedef std::map<uint32_t, Handler*>::iterator HandlerMapIt;
314
315 PayloadSinkFactoryInterface* payload_sink_factory_;
316 PayloadTypes payload_types_;
317 HandlerMap handlers_;
318
319 DISALLOW_IMPLICIT_CONSTRUCTORS(SsrcHandlers);
320 };
321
322 class RtpPlayerImpl : public RtpPlayerInterface {
323 public:
RtpPlayerImpl(PayloadSinkFactoryInterface * payload_sink_factory,const PayloadTypes & payload_types,Clock * clock,scoped_ptr<RtpPacketSourceInterface> * packet_source,float loss_rate,uint32_t rtt_ms,bool reordering)324 RtpPlayerImpl(PayloadSinkFactoryInterface* payload_sink_factory,
325 const PayloadTypes& payload_types, Clock* clock,
326 scoped_ptr<RtpPacketSourceInterface>* packet_source,
327 float loss_rate, uint32_t rtt_ms, bool reordering)
328 : ssrc_handlers_(payload_sink_factory, payload_types),
329 clock_(clock),
330 next_rtp_time_(0),
331 first_packet_(true),
332 first_packet_rtp_time_(0),
333 first_packet_time_ms_(0),
334 loss_rate_(loss_rate),
335 lost_packets_(clock, rtt_ms),
336 resend_packet_count_(0),
337 no_loss_startup_(100),
338 end_of_file_(false),
339 reordering_(false),
340 reorder_buffer_(),
341 next_packet_(),
342 next_packet_length_(0) {
343 assert(clock);
344 assert(packet_source);
345 assert(packet_source->get());
346 packet_source_.swap(*packet_source);
347 srand(321);
348 }
349
~RtpPlayerImpl()350 virtual ~RtpPlayerImpl() {}
351
NextPacket(int64_t time_now)352 virtual int NextPacket(int64_t time_now) {
353 // Send any packets ready to be resent.
354 RawRtpPacket* packet;
355 while ((packet = lost_packets_.NextPacketToResend(time_now))) {
356 int ret = SendPacket(packet->data(), packet->length());
357 if (ret > 0) {
358 printf("Resend: %08x:%u\n", packet->ssrc(), packet->seq_num());
359 lost_packets_.LogPacketResent(packet);
360 resend_packet_count_++;
361 }
362 delete packet;
363 if (ret < 0) {
364 return ret;
365 }
366 }
367
368 // Send any packets from packet source.
369 if (!end_of_file_ && (TimeUntilNextPacket() == 0 || first_packet_)) {
370 if (first_packet_) {
371 next_packet_length_ = sizeof(next_packet_);
372 if (packet_source_->NextPacket(next_packet_, &next_packet_length_,
373 &next_rtp_time_) != 0) {
374 return 0;
375 }
376 first_packet_rtp_time_ = next_rtp_time_;
377 first_packet_time_ms_ = clock_->TimeInMilliseconds();
378 first_packet_ = false;
379 }
380
381 if (reordering_ && reorder_buffer_.get() == NULL) {
382 reorder_buffer_.reset(new RawRtpPacket(next_packet_,
383 next_packet_length_, 0, 0));
384 return 0;
385 }
386 int ret = SendPacket(next_packet_, next_packet_length_);
387 if (reorder_buffer_.get()) {
388 SendPacket(reorder_buffer_->data(), reorder_buffer_->length());
389 reorder_buffer_.reset(NULL);
390 }
391 if (ret < 0) {
392 return ret;
393 }
394
395 next_packet_length_ = sizeof(next_packet_);
396 if (packet_source_->NextPacket(next_packet_, &next_packet_length_,
397 &next_rtp_time_) != 0) {
398 end_of_file_ = true;
399 return 0;
400 }
401 else if (next_packet_length_ == 0) {
402 return 0;
403 }
404 }
405
406 if (end_of_file_ && lost_packets_.NumberOfPacketsToResend() == 0) {
407 return 1;
408 }
409 return 0;
410 }
411
TimeUntilNextPacket() const412 virtual uint32_t TimeUntilNextPacket() const {
413 int64_t time_left = (next_rtp_time_ - first_packet_rtp_time_) -
414 (clock_->TimeInMilliseconds() - first_packet_time_ms_);
415 if (time_left < 0) {
416 return 0;
417 }
418 return static_cast<uint32_t>(time_left);
419 }
420
Print() const421 virtual void Print() const {
422 printf("Resent packets: %u\n", resend_packet_count_);
423 lost_packets_.Print();
424 }
425
426 private:
SendPacket(const uint8_t * data,uint32_t length)427 int SendPacket(const uint8_t* data, uint32_t length) {
428 assert(data);
429 assert(length > 0);
430
431 scoped_ptr<RtpHeaderParser> rtp_header_parser(RtpHeaderParser::Create());
432 if (!rtp_header_parser->IsRtcp(data, length)) {
433 RTPHeader header;
434 if (!rtp_header_parser->Parse(data, length, &header)) {
435 return -1;
436 }
437 uint32_t ssrc = header.ssrc;
438 if (ssrc_handlers_.RegisterSsrc(ssrc, &lost_packets_, clock_) < 0) {
439 DEBUG_LOG1("Unable to register ssrc: %d", ssrc);
440 return -1;
441 }
442
443 if (no_loss_startup_ > 0) {
444 no_loss_startup_--;
445 } else if ((rand() + 1.0)/(RAND_MAX + 1.0) < loss_rate_) {
446 uint16_t seq_num = header.sequenceNumber;
447 lost_packets_.AddPacket(new RawRtpPacket(data, length, ssrc, seq_num));
448 DEBUG_LOG1("Dropped packet: %d!", header.header.sequenceNumber);
449 return 0;
450 }
451 }
452
453 ssrc_handlers_.IncomingPacket(data, length);
454 return 1;
455 }
456
457 SsrcHandlers ssrc_handlers_;
458 Clock* clock_;
459 scoped_ptr<RtpPacketSourceInterface> packet_source_;
460 uint32_t next_rtp_time_;
461 bool first_packet_;
462 int64_t first_packet_rtp_time_;
463 int64_t first_packet_time_ms_;
464 float loss_rate_;
465 LostPackets lost_packets_;
466 uint32_t resend_packet_count_;
467 uint32_t no_loss_startup_;
468 bool end_of_file_;
469 bool reordering_;
470 scoped_ptr<RawRtpPacket> reorder_buffer_;
471 uint8_t next_packet_[kMaxPacketBufferSize];
472 uint32_t next_packet_length_;
473
474 DISALLOW_IMPLICIT_CONSTRUCTORS(RtpPlayerImpl);
475 };
476
Create(const std::string & input_filename,PayloadSinkFactoryInterface * payload_sink_factory,Clock * clock,const PayloadTypes & payload_types,float loss_rate,uint32_t rtt_ms,bool reordering)477 RtpPlayerInterface* Create(const std::string& input_filename,
478 PayloadSinkFactoryInterface* payload_sink_factory, Clock* clock,
479 const PayloadTypes& payload_types, float loss_rate, uint32_t rtt_ms,
480 bool reordering) {
481 scoped_ptr<RtpPacketSourceInterface> packet_source(
482 CreateRtpFileReader(input_filename));
483 if (packet_source.get() == NULL) {
484 packet_source.reset(CreatePcapFileReader(input_filename));
485 if (packet_source.get() == NULL) {
486 return NULL;
487 }
488 }
489
490 scoped_ptr<RtpPlayerImpl> impl(new RtpPlayerImpl(payload_sink_factory,
491 payload_types, clock, &packet_source, loss_rate, rtt_ms, reordering));
492 return impl.release();
493 }
494 } // namespace rtpplayer
495 } // namespace webrtc
496