• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/media/webrtc_rtp_dump_writer.h"
6 
7 #include "base/big_endian.h"
8 #include "base/files/file_util.h"
9 #include "base/logging.h"
10 #include "content/public/browser/browser_thread.h"
11 #include "third_party/zlib/zlib.h"
12 
13 using content::BrowserThread;
14 
15 namespace {
16 
17 static const size_t kMinimumGzipOutputBufferSize = 256;  // In bytes.
18 
19 const unsigned char kRtpDumpFileHeaderFirstLine[] = "#!rtpplay1.0 0.0.0.0/0\n";
20 static const size_t kRtpDumpFileHeaderSize = 16;  // In bytes.
21 
22 // A helper for writing the header of the dump file.
WriteRtpDumpFileHeaderBigEndian(base::TimeTicks start,std::vector<uint8> * output)23 void WriteRtpDumpFileHeaderBigEndian(base::TimeTicks start,
24                                      std::vector<uint8>* output) {
25   size_t buffer_start_pos = output->size();
26   output->resize(output->size() + kRtpDumpFileHeaderSize);
27 
28   char* buffer = reinterpret_cast<char*>(&(*output)[buffer_start_pos]);
29 
30   base::TimeDelta delta = start - base::TimeTicks();
31   uint32 start_sec = delta.InSeconds();
32   base::WriteBigEndian(buffer, start_sec);
33   buffer += sizeof(start_sec);
34 
35   uint32 start_usec =
36       delta.InMilliseconds() * base::Time::kMicrosecondsPerMillisecond;
37   base::WriteBigEndian(buffer, start_usec);
38   buffer += sizeof(start_usec);
39 
40   // Network source, always 0.
41   base::WriteBigEndian(buffer, uint32(0));
42   buffer += sizeof(uint32);
43 
44   // UDP port, always 0.
45   base::WriteBigEndian(buffer, uint16(0));
46   buffer += sizeof(uint16);
47 
48   // 2 bytes padding.
49   base::WriteBigEndian(buffer, uint16(0));
50 }
51 
52 // The header size for each packet dump.
53 static const size_t kPacketDumpHeaderSize = 8;  // In bytes.
54 
55 // A helper for writing the header for each packet dump.
56 // |start| is the time when the recording is started.
57 // |dump_length| is the length of the packet dump including this header.
58 // |packet_length| is the length of the RTP packet header.
WritePacketDumpHeaderBigEndian(const base::TimeTicks & start,uint16 dump_length,uint16 packet_length,std::vector<uint8> * output)59 void WritePacketDumpHeaderBigEndian(const base::TimeTicks& start,
60                                     uint16 dump_length,
61                                     uint16 packet_length,
62                                     std::vector<uint8>* output) {
63   size_t buffer_start_pos = output->size();
64   output->resize(output->size() + kPacketDumpHeaderSize);
65 
66   char* buffer = reinterpret_cast<char*>(&(*output)[buffer_start_pos]);
67 
68   base::WriteBigEndian(buffer, dump_length);
69   buffer += sizeof(dump_length);
70 
71   base::WriteBigEndian(buffer, packet_length);
72   buffer += sizeof(packet_length);
73 
74   uint32 elapsed =
75       static_cast<uint32>((base::TimeTicks::Now() - start).InMilliseconds());
76   base::WriteBigEndian(buffer, elapsed);
77 }
78 
79 // Append |src_len| bytes from |src| to |dest|.
AppendToBuffer(const uint8 * src,size_t src_len,std::vector<uint8> * dest)80 void AppendToBuffer(const uint8* src,
81                     size_t src_len,
82                     std::vector<uint8>* dest) {
83   size_t old_dest_size = dest->size();
84   dest->resize(old_dest_size + src_len);
85   memcpy(&(*dest)[old_dest_size], src, src_len);
86 }
87 
88 }  // namespace
89 
90 // This class is running on the FILE thread for compressing and writing the
91 // dump buffer to disk.
92 class WebRtcRtpDumpWriter::FileThreadWorker {
93  public:
FileThreadWorker(const base::FilePath & dump_path)94   explicit FileThreadWorker(const base::FilePath& dump_path)
95       : dump_path_(dump_path) {
96     thread_checker_.DetachFromThread();
97 
98     memset(&stream_, 0, sizeof(stream_));
99     int result = deflateInit2(&stream_,
100                               Z_DEFAULT_COMPRESSION,
101                               Z_DEFLATED,
102                               // windowBits = 15 is default, 16 is added to
103                               // produce a gzip header + trailer.
104                               15 + 16,
105                               8,  // memLevel = 8 is default.
106                               Z_DEFAULT_STRATEGY);
107     DCHECK_EQ(Z_OK, result);
108   }
109 
~FileThreadWorker()110   ~FileThreadWorker() {
111     DCHECK(thread_checker_.CalledOnValidThread());
112 
113     // Makes sure all allocations are freed.
114     deflateEnd(&stream_);
115   }
116 
117   // Compresses the data in |buffer| and write to the dump file. If |end_stream|
118   // is true, the compression stream will be ended and the dump file cannot be
119   // written to any more.
CompressAndWriteToFileOnFileThread(scoped_ptr<std::vector<uint8>> buffer,bool end_stream,FlushResult * result,size_t * bytes_written)120   void CompressAndWriteToFileOnFileThread(
121       scoped_ptr<std::vector<uint8> > buffer,
122       bool end_stream,
123       FlushResult* result,
124       size_t* bytes_written) {
125     DCHECK(thread_checker_.CalledOnValidThread());
126 
127     // This is called either when the in-memory buffer is full or the dump
128     // should be ended.
129     DCHECK(!buffer->empty() || end_stream);
130 
131     *result = FLUSH_RESULT_SUCCESS;
132     *bytes_written = 0;
133 
134     // There may be nothing to compress/write if there is no RTP packet since
135     // the last flush.
136     if (!buffer->empty()) {
137       *bytes_written = CompressAndWriteBufferToFile(buffer.get(), result);
138     } else if (!base::PathExists(dump_path_)) {
139       // If the dump does not exist, it means there is no RTP packet recorded.
140       // Return FLUSH_RESULT_NO_DATA to indicate no dump file created.
141       *result = FLUSH_RESULT_NO_DATA;
142     }
143 
144     if (end_stream && !EndDumpFile())
145       *result = FLUSH_RESULT_FAILURE;
146   }
147 
148  private:
149   // Helper for CompressAndWriteToFileOnFileThread to compress and write one
150   // dump.
CompressAndWriteBufferToFile(std::vector<uint8> * buffer,FlushResult * result)151   size_t CompressAndWriteBufferToFile(std::vector<uint8>* buffer,
152                                       FlushResult* result) {
153     DCHECK(thread_checker_.CalledOnValidThread());
154     DCHECK(buffer->size());
155 
156     *result = FLUSH_RESULT_SUCCESS;
157 
158     std::vector<uint8> compressed_buffer;
159     if (!Compress(buffer, &compressed_buffer)) {
160       DVLOG(2) << "Compressing buffer failed.";
161       *result = FLUSH_RESULT_FAILURE;
162       return 0;
163     }
164 
165     int bytes_written = -1;
166 
167     if (base::PathExists(dump_path_)) {
168       bytes_written = base::AppendToFile(
169           dump_path_,
170           reinterpret_cast<const char*>(&compressed_buffer[0]),
171           compressed_buffer.size());
172     } else {
173       bytes_written = base::WriteFile(
174           dump_path_,
175           reinterpret_cast<const char*>(&compressed_buffer[0]),
176           compressed_buffer.size());
177     }
178 
179     if (bytes_written == -1) {
180       DVLOG(2) << "Writing file failed: " << dump_path_.value();
181       *result = FLUSH_RESULT_FAILURE;
182       return 0;
183     }
184 
185     DCHECK_EQ(static_cast<size_t>(bytes_written), compressed_buffer.size());
186     return bytes_written;
187   }
188 
189   // Compresses |input| into |output|.
Compress(std::vector<uint8> * input,std::vector<uint8> * output)190   bool Compress(std::vector<uint8>* input, std::vector<uint8>* output) {
191     DCHECK(thread_checker_.CalledOnValidThread());
192     int result = Z_OK;
193 
194     output->resize(std::max(kMinimumGzipOutputBufferSize, input->size()));
195 
196     stream_.next_in = &(*input)[0];
197     stream_.avail_in = input->size();
198     stream_.next_out = &(*output)[0];
199     stream_.avail_out = output->size();
200 
201     result = deflate(&stream_, Z_SYNC_FLUSH);
202     DCHECK_EQ(Z_OK, result);
203     DCHECK_EQ(0U, stream_.avail_in);
204 
205     output->resize(output->size() - stream_.avail_out);
206 
207     stream_.next_in = NULL;
208     stream_.next_out = NULL;
209     stream_.avail_out = 0;
210     return true;
211   }
212 
213   // Ends the compression stream and completes the dump file.
EndDumpFile()214   bool EndDumpFile() {
215     DCHECK(thread_checker_.CalledOnValidThread());
216 
217     std::vector<uint8> output_buffer;
218     output_buffer.resize(kMinimumGzipOutputBufferSize);
219 
220     stream_.next_in = NULL;
221     stream_.avail_in = 0;
222     stream_.next_out = &output_buffer[0];
223     stream_.avail_out = output_buffer.size();
224 
225     int result = deflate(&stream_, Z_FINISH);
226     DCHECK_EQ(Z_STREAM_END, result);
227 
228     result = deflateEnd(&stream_);
229     DCHECK_EQ(Z_OK, result);
230 
231     output_buffer.resize(output_buffer.size() - stream_.avail_out);
232 
233     memset(&stream_, 0, sizeof(z_stream));
234 
235     DCHECK(!output_buffer.empty());
236     int bytes_written =
237         base::AppendToFile(dump_path_,
238                            reinterpret_cast<const char*>(&output_buffer[0]),
239                            output_buffer.size());
240 
241     return bytes_written > 0;
242   }
243 
244   const base::FilePath dump_path_;
245 
246   z_stream stream_;
247 
248   base::ThreadChecker thread_checker_;
249 
250   DISALLOW_COPY_AND_ASSIGN(FileThreadWorker);
251 };
252 
WebRtcRtpDumpWriter(const base::FilePath & incoming_dump_path,const base::FilePath & outgoing_dump_path,size_t max_dump_size,const base::Closure & max_dump_size_reached_callback)253 WebRtcRtpDumpWriter::WebRtcRtpDumpWriter(
254     const base::FilePath& incoming_dump_path,
255     const base::FilePath& outgoing_dump_path,
256     size_t max_dump_size,
257     const base::Closure& max_dump_size_reached_callback)
258     : max_dump_size_(max_dump_size),
259       max_dump_size_reached_callback_(max_dump_size_reached_callback),
260       total_dump_size_on_disk_(0),
261       incoming_file_thread_worker_(new FileThreadWorker(incoming_dump_path)),
262       outgoing_file_thread_worker_(new FileThreadWorker(outgoing_dump_path)),
263       weak_ptr_factory_(this) {
264 }
265 
~WebRtcRtpDumpWriter()266 WebRtcRtpDumpWriter::~WebRtcRtpDumpWriter() {
267   DCHECK(thread_checker_.CalledOnValidThread());
268 
269   bool success = BrowserThread::DeleteSoon(
270       BrowserThread::FILE, FROM_HERE, incoming_file_thread_worker_.release());
271   DCHECK(success);
272 
273   success = BrowserThread::DeleteSoon(
274       BrowserThread::FILE, FROM_HERE, outgoing_file_thread_worker_.release());
275   DCHECK(success);
276 }
277 
WriteRtpPacket(const uint8 * packet_header,size_t header_length,size_t packet_length,bool incoming)278 void WebRtcRtpDumpWriter::WriteRtpPacket(const uint8* packet_header,
279                                          size_t header_length,
280                                          size_t packet_length,
281                                          bool incoming) {
282   DCHECK(thread_checker_.CalledOnValidThread());
283 
284   static const size_t kMaxInMemoryBufferSize = 65536;
285 
286   std::vector<uint8>* dest_buffer =
287       incoming ? &incoming_buffer_ : &outgoing_buffer_;
288 
289   // We use the capacity of the buffer to indicate if the buffer has been
290   // initialized and if the dump file header has been created.
291   if (!dest_buffer->capacity()) {
292     dest_buffer->reserve(std::min(kMaxInMemoryBufferSize, max_dump_size_));
293 
294     start_time_ = base::TimeTicks::Now();
295 
296     // Writes the dump file header.
297     AppendToBuffer(kRtpDumpFileHeaderFirstLine,
298                    arraysize(kRtpDumpFileHeaderFirstLine) - 1,
299                    dest_buffer);
300     WriteRtpDumpFileHeaderBigEndian(start_time_, dest_buffer);
301   }
302 
303   size_t packet_dump_length = kPacketDumpHeaderSize + header_length;
304 
305   // Flushes the buffer to disk if the buffer is full.
306   if (dest_buffer->size() + packet_dump_length > dest_buffer->capacity())
307     FlushBuffer(incoming, false, FlushDoneCallback());
308 
309   WritePacketDumpHeaderBigEndian(
310       start_time_, packet_dump_length, packet_length, dest_buffer);
311 
312   // Writes the actual RTP packet header.
313   AppendToBuffer(packet_header, header_length, dest_buffer);
314 }
315 
EndDump(RtpDumpType type,const EndDumpCallback & finished_callback)316 void WebRtcRtpDumpWriter::EndDump(RtpDumpType type,
317                                   const EndDumpCallback& finished_callback) {
318   DCHECK(thread_checker_.CalledOnValidThread());
319   DCHECK(type == RTP_DUMP_OUTGOING || incoming_file_thread_worker_ != NULL);
320   DCHECK(type == RTP_DUMP_INCOMING || outgoing_file_thread_worker_ != NULL);
321 
322   bool incoming = (type == RTP_DUMP_BOTH || type == RTP_DUMP_INCOMING);
323   EndDumpContext context(type, finished_callback);
324 
325   // End the incoming dump first if required. OnDumpEnded will continue to end
326   // the outgoing dump if necessary.
327   FlushBuffer(incoming,
328               true,
329               base::Bind(&WebRtcRtpDumpWriter::OnDumpEnded,
330                          weak_ptr_factory_.GetWeakPtr(),
331                          context,
332                          incoming));
333 }
334 
max_dump_size() const335 size_t WebRtcRtpDumpWriter::max_dump_size() const {
336   DCHECK(thread_checker_.CalledOnValidThread());
337   return max_dump_size_;
338 }
339 
EndDumpContext(RtpDumpType type,const EndDumpCallback & callback)340 WebRtcRtpDumpWriter::EndDumpContext::EndDumpContext(
341     RtpDumpType type,
342     const EndDumpCallback& callback)
343     : type(type),
344       incoming_succeeded(false),
345       outgoing_succeeded(false),
346       callback(callback) {
347 }
348 
~EndDumpContext()349 WebRtcRtpDumpWriter::EndDumpContext::~EndDumpContext() {
350 }
351 
FlushBuffer(bool incoming,bool end_stream,const FlushDoneCallback & callback)352 void WebRtcRtpDumpWriter::FlushBuffer(bool incoming,
353                                       bool end_stream,
354                                       const FlushDoneCallback& callback) {
355   DCHECK(thread_checker_.CalledOnValidThread());
356 
357   scoped_ptr<std::vector<uint8> > new_buffer(new std::vector<uint8>());
358 
359   if (incoming) {
360     new_buffer->reserve(incoming_buffer_.capacity());
361     new_buffer->swap(incoming_buffer_);
362   } else {
363     new_buffer->reserve(outgoing_buffer_.capacity());
364     new_buffer->swap(outgoing_buffer_);
365   }
366 
367   scoped_ptr<FlushResult> result(new FlushResult(FLUSH_RESULT_FAILURE));
368 
369   scoped_ptr<size_t> bytes_written(new size_t(0));
370 
371   FileThreadWorker* worker = incoming ? incoming_file_thread_worker_.get()
372                                       : outgoing_file_thread_worker_.get();
373 
374   // Using "Unretained(worker)" because |worker| is owner by this object and it
375   // guaranteed to be deleted on the FILE thread before this object goes away.
376   base::Closure task =
377       base::Bind(&FileThreadWorker::CompressAndWriteToFileOnFileThread,
378                  base::Unretained(worker),
379                  Passed(&new_buffer),
380                  end_stream,
381                  result.get(),
382                  bytes_written.get());
383 
384   // OnFlushDone is necessary to avoid running the callback after this
385   // object is gone.
386   base::Closure reply = base::Bind(&WebRtcRtpDumpWriter::OnFlushDone,
387                                    weak_ptr_factory_.GetWeakPtr(),
388                                    callback,
389                                    Passed(&result),
390                                    Passed(&bytes_written));
391 
392   // Define the task and reply outside the method call so that getting and
393   // passing the scoped_ptr does not depend on the argument evaluation order.
394   BrowserThread::PostTaskAndReply(BrowserThread::FILE, FROM_HERE, task, reply);
395 
396   if (end_stream) {
397     bool success = BrowserThread::DeleteSoon(
398         BrowserThread::FILE,
399         FROM_HERE,
400         incoming ? incoming_file_thread_worker_.release()
401                  : outgoing_file_thread_worker_.release());
402     DCHECK(success);
403   }
404 }
405 
OnFlushDone(const FlushDoneCallback & callback,const scoped_ptr<FlushResult> & result,const scoped_ptr<size_t> & bytes_written)406 void WebRtcRtpDumpWriter::OnFlushDone(const FlushDoneCallback& callback,
407                                       const scoped_ptr<FlushResult>& result,
408                                       const scoped_ptr<size_t>& bytes_written) {
409   DCHECK(thread_checker_.CalledOnValidThread());
410 
411   total_dump_size_on_disk_ += *bytes_written;
412 
413   if (total_dump_size_on_disk_ >= max_dump_size_ &&
414       !max_dump_size_reached_callback_.is_null()) {
415     max_dump_size_reached_callback_.Run();
416   }
417 
418   // Returns success for FLUSH_RESULT_MAX_SIZE_REACHED since the dump is still
419   // valid.
420   if (!callback.is_null()) {
421     callback.Run(*result != FLUSH_RESULT_FAILURE &&
422                  *result != FLUSH_RESULT_NO_DATA);
423   }
424 }
425 
OnDumpEnded(EndDumpContext context,bool incoming,bool success)426 void WebRtcRtpDumpWriter::OnDumpEnded(EndDumpContext context,
427                                       bool incoming,
428                                       bool success) {
429   DCHECK(thread_checker_.CalledOnValidThread());
430 
431   DVLOG(2) << "Dump ended, incoming = " << incoming
432            << ", succeeded = " << success;
433 
434   if (incoming)
435     context.incoming_succeeded = success;
436   else
437     context.outgoing_succeeded = success;
438 
439   // End the outgoing dump if needed.
440   if (incoming && context.type == RTP_DUMP_BOTH) {
441     FlushBuffer(false,
442                 true,
443                 base::Bind(&WebRtcRtpDumpWriter::OnDumpEnded,
444                            weak_ptr_factory_.GetWeakPtr(),
445                            context,
446                            false));
447     return;
448   }
449 
450   // This object might be deleted after running the callback.
451   context.callback.Run(context.incoming_succeeded, context.outgoing_succeeded);
452 }
453