• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #if defined(WEBRTC_POSIX)
12 #include <sys/file.h>
13 #endif  // WEBRTC_POSIX
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <errno.h>
17 #include <string>
18 #include "webrtc/base/basictypes.h"
19 #include "webrtc/base/common.h"
20 #include "webrtc/base/logging.h"
21 #include "webrtc/base/messagequeue.h"
22 #include "webrtc/base/stream.h"
23 #include "webrtc/base/stringencode.h"
24 #include "webrtc/base/stringutils.h"
25 #include "webrtc/base/thread.h"
26 #include "webrtc/base/timeutils.h"
27 
28 #if defined(WEBRTC_WIN)
29 #include "webrtc/base/win32.h"
30 #define fileno _fileno
31 #endif
32 
33 namespace rtc {
34 
35 ///////////////////////////////////////////////////////////////////////////////
36 // StreamInterface
37 ///////////////////////////////////////////////////////////////////////////////
~StreamInterface()38 StreamInterface::~StreamInterface() {
39 }
40 
WriteAll(const void * data,size_t data_len,size_t * written,int * error)41 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
42                                        size_t* written, int* error) {
43   StreamResult result = SR_SUCCESS;
44   size_t total_written = 0, current_written;
45   while (total_written < data_len) {
46     result = Write(static_cast<const char*>(data) + total_written,
47                    data_len - total_written, &current_written, error);
48     if (result != SR_SUCCESS)
49       break;
50     total_written += current_written;
51   }
52   if (written)
53     *written = total_written;
54   return result;
55 }
56 
ReadAll(void * buffer,size_t buffer_len,size_t * read,int * error)57 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
58                                       size_t* read, int* error) {
59   StreamResult result = SR_SUCCESS;
60   size_t total_read = 0, current_read;
61   while (total_read < buffer_len) {
62     result = Read(static_cast<char*>(buffer) + total_read,
63                   buffer_len - total_read, &current_read, error);
64     if (result != SR_SUCCESS)
65       break;
66     total_read += current_read;
67   }
68   if (read)
69     *read = total_read;
70   return result;
71 }
72 
ReadLine(std::string * line)73 StreamResult StreamInterface::ReadLine(std::string* line) {
74   line->clear();
75   StreamResult result = SR_SUCCESS;
76   while (true) {
77     char ch;
78     result = Read(&ch, sizeof(ch), NULL, NULL);
79     if (result != SR_SUCCESS) {
80       break;
81     }
82     if (ch == '\n') {
83       break;
84     }
85     line->push_back(ch);
86   }
87   if (!line->empty()) {   // give back the line we've collected so far with
88     result = SR_SUCCESS;  // a success code.  Otherwise return the last code
89   }
90   return result;
91 }
92 
PostEvent(Thread * t,int events,int err)93 void StreamInterface::PostEvent(Thread* t, int events, int err) {
94   t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
95 }
96 
PostEvent(int events,int err)97 void StreamInterface::PostEvent(int events, int err) {
98   PostEvent(Thread::Current(), events, err);
99 }
100 
StreamInterface()101 StreamInterface::StreamInterface() {
102 }
103 
OnMessage(Message * msg)104 void StreamInterface::OnMessage(Message* msg) {
105   if (MSG_POST_EVENT == msg->message_id) {
106     StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
107     SignalEvent(this, pe->events, pe->error);
108     delete msg->pdata;
109   }
110 }
111 
112 ///////////////////////////////////////////////////////////////////////////////
113 // StreamAdapterInterface
114 ///////////////////////////////////////////////////////////////////////////////
115 
StreamAdapterInterface(StreamInterface * stream,bool owned)116 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
117                                                bool owned)
118     : stream_(stream), owned_(owned) {
119   if (NULL != stream_)
120     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
121 }
122 
Attach(StreamInterface * stream,bool owned)123 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
124   if (NULL != stream_)
125     stream_->SignalEvent.disconnect(this);
126   if (owned_)
127     delete stream_;
128   stream_ = stream;
129   owned_ = owned;
130   if (NULL != stream_)
131     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
132 }
133 
Detach()134 StreamInterface* StreamAdapterInterface::Detach() {
135   if (NULL != stream_)
136     stream_->SignalEvent.disconnect(this);
137   StreamInterface* stream = stream_;
138   stream_ = NULL;
139   return stream;
140 }
141 
~StreamAdapterInterface()142 StreamAdapterInterface::~StreamAdapterInterface() {
143   if (owned_)
144     delete stream_;
145 }
146 
147 ///////////////////////////////////////////////////////////////////////////////
148 // StreamTap
149 ///////////////////////////////////////////////////////////////////////////////
150 
StreamTap(StreamInterface * stream,StreamInterface * tap)151 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
152     : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
153         tap_error_(0) {
154   AttachTap(tap);
155 }
156 
AttachTap(StreamInterface * tap)157 void StreamTap::AttachTap(StreamInterface* tap) {
158   tap_.reset(tap);
159 }
160 
DetachTap()161 StreamInterface* StreamTap::DetachTap() {
162   return tap_.release();
163 }
164 
GetTapResult(int * error)165 StreamResult StreamTap::GetTapResult(int* error) {
166   if (error) {
167     *error = tap_error_;
168   }
169   return tap_result_;
170 }
171 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)172 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
173                              size_t* read, int* error) {
174   size_t backup_read;
175   if (!read) {
176     read = &backup_read;
177   }
178   StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
179                                                   read, error);
180   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
181     tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
182   }
183   return res;
184 }
185 
Write(const void * data,size_t data_len,size_t * written,int * error)186 StreamResult StreamTap::Write(const void* data, size_t data_len,
187                               size_t* written, int* error) {
188   size_t backup_written;
189   if (!written) {
190     written = &backup_written;
191   }
192   StreamResult res = StreamAdapterInterface::Write(data, data_len,
193                                                    written, error);
194   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
195     tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
196   }
197   return res;
198 }
199 
200 ///////////////////////////////////////////////////////////////////////////////
201 // StreamSegment
202 ///////////////////////////////////////////////////////////////////////////////
203 
StreamSegment(StreamInterface * stream)204 StreamSegment::StreamSegment(StreamInterface* stream)
205     : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
206     length_(SIZE_UNKNOWN) {
207   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
208   stream->GetPosition(&start_);
209 }
210 
StreamSegment(StreamInterface * stream,size_t length)211 StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
212     : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
213     length_(length) {
214   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
215   stream->GetPosition(&start_);
216 }
217 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)218 StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
219                                  size_t* read, int* error) {
220   if (SIZE_UNKNOWN != length_) {
221     if (pos_ >= length_)
222       return SR_EOS;
223     buffer_len = _min(buffer_len, length_ - pos_);
224   }
225   size_t backup_read;
226   if (!read) {
227     read = &backup_read;
228   }
229   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
230                                                      read, error);
231   if (SR_SUCCESS == result) {
232     pos_ += *read;
233   }
234   return result;
235 }
236 
SetPosition(size_t position)237 bool StreamSegment::SetPosition(size_t position) {
238   if (SIZE_UNKNOWN == start_)
239     return false;  // Not seekable
240   if ((SIZE_UNKNOWN != length_) && (position > length_))
241     return false;  // Seek past end of segment
242   if (!StreamAdapterInterface::SetPosition(start_ + position))
243     return false;
244   pos_ = position;
245   return true;
246 }
247 
GetPosition(size_t * position) const248 bool StreamSegment::GetPosition(size_t* position) const {
249   if (SIZE_UNKNOWN == start_)
250     return false;  // Not seekable
251   if (!StreamAdapterInterface::GetPosition(position))
252     return false;
253   if (position) {
254     ASSERT(*position >= start_);
255     *position -= start_;
256   }
257   return true;
258 }
259 
GetSize(size_t * size) const260 bool StreamSegment::GetSize(size_t* size) const {
261   if (!StreamAdapterInterface::GetSize(size))
262     return false;
263   if (size) {
264     if (SIZE_UNKNOWN != start_) {
265       ASSERT(*size >= start_);
266       *size -= start_;
267     }
268     if (SIZE_UNKNOWN != length_) {
269       *size = _min(*size, length_);
270     }
271   }
272   return true;
273 }
274 
GetAvailable(size_t * size) const275 bool StreamSegment::GetAvailable(size_t* size) const {
276   if (!StreamAdapterInterface::GetAvailable(size))
277     return false;
278   if (size && (SIZE_UNKNOWN != length_))
279     *size = _min(*size, length_ - pos_);
280   return true;
281 }
282 
283 ///////////////////////////////////////////////////////////////////////////////
284 // NullStream
285 ///////////////////////////////////////////////////////////////////////////////
286 
NullStream()287 NullStream::NullStream() {
288 }
289 
~NullStream()290 NullStream::~NullStream() {
291 }
292 
GetState() const293 StreamState NullStream::GetState() const {
294   return SS_OPEN;
295 }
296 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)297 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
298                               size_t* read, int* error) {
299   if (error) *error = -1;
300   return SR_ERROR;
301 }
302 
Write(const void * data,size_t data_len,size_t * written,int * error)303 StreamResult NullStream::Write(const void* data, size_t data_len,
304                                size_t* written, int* error) {
305   if (written) *written = data_len;
306   return SR_SUCCESS;
307 }
308 
Close()309 void NullStream::Close() {
310 }
311 
312 ///////////////////////////////////////////////////////////////////////////////
313 // FileStream
314 ///////////////////////////////////////////////////////////////////////////////
315 
FileStream()316 FileStream::FileStream() : file_(NULL) {
317 }
318 
~FileStream()319 FileStream::~FileStream() {
320   FileStream::Close();
321 }
322 
Open(const std::string & filename,const char * mode,int * error)323 bool FileStream::Open(const std::string& filename, const char* mode,
324                       int* error) {
325   Close();
326 #if defined(WEBRTC_WIN)
327   std::wstring wfilename;
328   if (Utf8ToWindowsFilename(filename, &wfilename)) {
329     file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
330   } else {
331     if (error) {
332       *error = -1;
333       return false;
334     }
335   }
336 #else
337   file_ = fopen(filename.c_str(), mode);
338 #endif
339   if (!file_ && error) {
340     *error = errno;
341   }
342   return (file_ != NULL);
343 }
344 
OpenShare(const std::string & filename,const char * mode,int shflag,int * error)345 bool FileStream::OpenShare(const std::string& filename, const char* mode,
346                            int shflag, int* error) {
347   Close();
348 #if defined(WEBRTC_WIN)
349   std::wstring wfilename;
350   if (Utf8ToWindowsFilename(filename, &wfilename)) {
351     file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
352     if (!file_ && error) {
353       *error = errno;
354       return false;
355     }
356     return file_ != NULL;
357   } else {
358     if (error) {
359       *error = -1;
360     }
361     return false;
362   }
363 #else
364   return Open(filename, mode, error);
365 #endif
366 }
367 
DisableBuffering()368 bool FileStream::DisableBuffering() {
369   if (!file_)
370     return false;
371   return (setvbuf(file_, NULL, _IONBF, 0) == 0);
372 }
373 
GetState() const374 StreamState FileStream::GetState() const {
375   return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
376 }
377 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)378 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
379                               size_t* read, int* error) {
380   if (!file_)
381     return SR_EOS;
382   size_t result = fread(buffer, 1, buffer_len, file_);
383   if ((result == 0) && (buffer_len > 0)) {
384     if (feof(file_))
385       return SR_EOS;
386     if (error)
387       *error = errno;
388     return SR_ERROR;
389   }
390   if (read)
391     *read = result;
392   return SR_SUCCESS;
393 }
394 
Write(const void * data,size_t data_len,size_t * written,int * error)395 StreamResult FileStream::Write(const void* data, size_t data_len,
396                                size_t* written, int* error) {
397   if (!file_)
398     return SR_EOS;
399   size_t result = fwrite(data, 1, data_len, file_);
400   if ((result == 0) && (data_len > 0)) {
401     if (error)
402       *error = errno;
403     return SR_ERROR;
404   }
405   if (written)
406     *written = result;
407   return SR_SUCCESS;
408 }
409 
Close()410 void FileStream::Close() {
411   if (file_) {
412     DoClose();
413     file_ = NULL;
414   }
415 }
416 
SetPosition(size_t position)417 bool FileStream::SetPosition(size_t position) {
418   if (!file_)
419     return false;
420   return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
421 }
422 
GetPosition(size_t * position) const423 bool FileStream::GetPosition(size_t* position) const {
424   ASSERT(NULL != position);
425   if (!file_)
426     return false;
427   long result = ftell(file_);
428   if (result < 0)
429     return false;
430   if (position)
431     *position = result;
432   return true;
433 }
434 
GetSize(size_t * size) const435 bool FileStream::GetSize(size_t* size) const {
436   ASSERT(NULL != size);
437   if (!file_)
438     return false;
439   struct stat file_stats;
440   if (fstat(fileno(file_), &file_stats) != 0)
441     return false;
442   if (size)
443     *size = file_stats.st_size;
444   return true;
445 }
446 
GetAvailable(size_t * size) const447 bool FileStream::GetAvailable(size_t* size) const {
448   ASSERT(NULL != size);
449   if (!GetSize(size))
450     return false;
451   long result = ftell(file_);
452   if (result < 0)
453     return false;
454   if (size)
455     *size -= result;
456   return true;
457 }
458 
ReserveSize(size_t size)459 bool FileStream::ReserveSize(size_t size) {
460   // TODO: extend the file to the proper length
461   return true;
462 }
463 
GetSize(const std::string & filename,size_t * size)464 bool FileStream::GetSize(const std::string& filename, size_t* size) {
465   struct stat file_stats;
466   if (stat(filename.c_str(), &file_stats) != 0)
467     return false;
468   *size = file_stats.st_size;
469   return true;
470 }
471 
Flush()472 bool FileStream::Flush() {
473   if (file_) {
474     return (0 == fflush(file_));
475   }
476   // try to flush empty file?
477   ASSERT(false);
478   return false;
479 }
480 
481 #if defined(WEBRTC_POSIX) && !defined(__native_client__)
482 
TryLock()483 bool FileStream::TryLock() {
484   if (file_ == NULL) {
485     // Stream not open.
486     ASSERT(false);
487     return false;
488   }
489 
490   return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
491 }
492 
Unlock()493 bool FileStream::Unlock() {
494   if (file_ == NULL) {
495     // Stream not open.
496     ASSERT(false);
497     return false;
498   }
499 
500   return flock(fileno(file_), LOCK_UN) == 0;
501 }
502 
503 #endif
504 
DoClose()505 void FileStream::DoClose() {
506   fclose(file_);
507 }
508 
CircularFileStream(size_t max_size)509 CircularFileStream::CircularFileStream(size_t max_size)
510   : max_write_size_(max_size),
511     position_(0),
512     marked_position_(max_size / 2),
513     last_write_position_(0),
514     read_segment_(READ_LATEST),
515     read_segment_available_(0) {
516 }
517 
Open(const std::string & filename,const char * mode,int * error)518 bool CircularFileStream::Open(
519     const std::string& filename, const char* mode, int* error) {
520   if (!FileStream::Open(filename.c_str(), mode, error))
521     return false;
522 
523   if (strchr(mode, "r") != NULL) {  // Opened in read mode.
524     // Check if the buffer has been overwritten and determine how to read the
525     // log in time sequence.
526     size_t file_size;
527     GetSize(&file_size);
528     if (file_size == position_) {
529       // The buffer has not been overwritten yet. Read 0 .. file_size
530       read_segment_ = READ_LATEST;
531       read_segment_available_ = file_size;
532     } else {
533       // The buffer has been over written. There are three segments: The first
534       // one is 0 .. marked_position_, which is the marked earliest log. The
535       // second one is position_ .. file_size, which is the middle log. The
536       // last one is marked_position_ .. position_, which is the latest log.
537       read_segment_ = READ_MARKED;
538       read_segment_available_ = marked_position_;
539       last_write_position_ = position_;
540     }
541 
542     // Read from the beginning.
543     position_ = 0;
544     SetPosition(position_);
545   }
546 
547   return true;
548 }
549 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)550 StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
551                                       size_t* read, int* error) {
552   if (read_segment_available_ == 0) {
553     size_t file_size;
554     switch (read_segment_) {
555       case READ_MARKED:  // Finished READ_MARKED and start READ_MIDDLE.
556         read_segment_ = READ_MIDDLE;
557         position_ = last_write_position_;
558         SetPosition(position_);
559         GetSize(&file_size);
560         read_segment_available_ = file_size - position_;
561         break;
562 
563       case READ_MIDDLE:  // Finished READ_MIDDLE and start READ_LATEST.
564         read_segment_ = READ_LATEST;
565         position_ = marked_position_;
566         SetPosition(position_);
567         read_segment_available_ = last_write_position_ - position_;
568         break;
569 
570       default:  // Finished READ_LATEST and return EOS.
571         return rtc::SR_EOS;
572     }
573   }
574 
575   size_t local_read;
576   if (!read) read = &local_read;
577 
578   size_t to_read = rtc::_min(buffer_len, read_segment_available_);
579   rtc::StreamResult result
580     = rtc::FileStream::Read(buffer, to_read, read, error);
581   if (result == rtc::SR_SUCCESS) {
582     read_segment_available_ -= *read;
583     position_ += *read;
584   }
585   return result;
586 }
587 
Write(const void * data,size_t data_len,size_t * written,int * error)588 StreamResult CircularFileStream::Write(const void* data, size_t data_len,
589                                        size_t* written, int* error) {
590   if (position_ >= max_write_size_) {
591     ASSERT(position_ == max_write_size_);
592     position_ = marked_position_;
593     SetPosition(position_);
594   }
595 
596   size_t local_written;
597   if (!written) written = &local_written;
598 
599   size_t to_eof = max_write_size_ - position_;
600   size_t to_write = rtc::_min(data_len, to_eof);
601   rtc::StreamResult result
602     = rtc::FileStream::Write(data, to_write, written, error);
603   if (result == rtc::SR_SUCCESS) {
604     position_ += *written;
605   }
606   return result;
607 }
608 
~AsyncWriteStream()609 AsyncWriteStream::~AsyncWriteStream() {
610   write_thread_->Clear(this, 0, NULL);
611   ClearBufferAndWrite();
612 
613   CritScope cs(&crit_stream_);
614   stream_.reset();
615 }
616 
617 // This is needed by some stream writers, such as RtpDumpWriter.
GetPosition(size_t * position) const618 bool AsyncWriteStream::GetPosition(size_t* position) const {
619   CritScope cs(&crit_stream_);
620   return stream_->GetPosition(position);
621 }
622 
623 // This is needed by some stream writers, such as the plugin log writers.
Read(void * buffer,size_t buffer_len,size_t * read,int * error)624 StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
625                                     size_t* read, int* error) {
626   CritScope cs(&crit_stream_);
627   return stream_->Read(buffer, buffer_len, read, error);
628 }
629 
Close()630 void AsyncWriteStream::Close() {
631   if (state_ == SS_CLOSED) {
632     return;
633   }
634 
635   write_thread_->Clear(this, 0, NULL);
636   ClearBufferAndWrite();
637 
638   CritScope cs(&crit_stream_);
639   stream_->Close();
640   state_ = SS_CLOSED;
641 }
642 
Write(const void * data,size_t data_len,size_t * written,int * error)643 StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
644                                      size_t* written, int* error) {
645   if (state_ == SS_CLOSED) {
646     return SR_ERROR;
647   }
648 
649   size_t previous_buffer_length = 0;
650   {
651     CritScope cs(&crit_buffer_);
652     previous_buffer_length = buffer_.length();
653     buffer_.AppendData(data, data_len);
654   }
655 
656   if (previous_buffer_length == 0) {
657     // If there's stuff already in the buffer, then we already called
658     // Post and the write_thread_ hasn't pulled it out yet, so we
659     // don't need to re-Post.
660     write_thread_->Post(this, 0, NULL);
661   }
662   // Return immediately, assuming that it works.
663   if (written) {
664     *written = data_len;
665   }
666   return SR_SUCCESS;
667 }
668 
OnMessage(rtc::Message * pmsg)669 void AsyncWriteStream::OnMessage(rtc::Message* pmsg) {
670   ClearBufferAndWrite();
671 }
672 
Flush()673 bool AsyncWriteStream::Flush() {
674   if (state_ == SS_CLOSED) {
675     return false;
676   }
677 
678   ClearBufferAndWrite();
679 
680   CritScope cs(&crit_stream_);
681   return stream_->Flush();
682 }
683 
ClearBufferAndWrite()684 void AsyncWriteStream::ClearBufferAndWrite() {
685   Buffer to_write;
686   {
687     CritScope cs_buffer(&crit_buffer_);
688     buffer_.TransferTo(&to_write);
689   }
690 
691   if (to_write.length() > 0) {
692     CritScope cs(&crit_stream_);
693     stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
694   }
695 }
696 
697 #if defined(WEBRTC_POSIX) && !defined(__native_client__)
698 
699 // Have to identically rewrite the FileStream destructor or else it would call
700 // the base class's Close() instead of the sub-class's.
~POpenStream()701 POpenStream::~POpenStream() {
702   POpenStream::Close();
703 }
704 
Open(const std::string & subcommand,const char * mode,int * error)705 bool POpenStream::Open(const std::string& subcommand,
706                        const char* mode,
707                        int* error) {
708   Close();
709   file_ = popen(subcommand.c_str(), mode);
710   if (file_ == NULL) {
711     if (error)
712       *error = errno;
713     return false;
714   }
715   return true;
716 }
717 
OpenShare(const std::string & subcommand,const char * mode,int shflag,int * error)718 bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
719                             int shflag, int* error) {
720   return Open(subcommand, mode, error);
721 }
722 
DoClose()723 void POpenStream::DoClose() {
724   wait_status_ = pclose(file_);
725 }
726 
727 #endif
728 
729 ///////////////////////////////////////////////////////////////////////////////
730 // MemoryStream
731 ///////////////////////////////////////////////////////////////////////////////
732 
MemoryStreamBase()733 MemoryStreamBase::MemoryStreamBase()
734   : buffer_(NULL), buffer_length_(0), data_length_(0),
735     seek_position_(0) {
736 }
737 
GetState() const738 StreamState MemoryStreamBase::GetState() const {
739   return SS_OPEN;
740 }
741 
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)742 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
743                                     size_t* bytes_read, int* error) {
744   if (seek_position_ >= data_length_) {
745     return SR_EOS;
746   }
747   size_t available = data_length_ - seek_position_;
748   if (bytes > available) {
749     // Read partial buffer
750     bytes = available;
751   }
752   memcpy(buffer, &buffer_[seek_position_], bytes);
753   seek_position_ += bytes;
754   if (bytes_read) {
755     *bytes_read = bytes;
756   }
757   return SR_SUCCESS;
758 }
759 
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)760 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
761                                      size_t* bytes_written, int* error) {
762   size_t available = buffer_length_ - seek_position_;
763   if (0 == available) {
764     // Increase buffer size to the larger of:
765     // a) new position rounded up to next 256 bytes
766     // b) double the previous length
767     size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
768                                     buffer_length_ * 2);
769     StreamResult result = DoReserve(new_buffer_length, error);
770     if (SR_SUCCESS != result) {
771       return result;
772     }
773     ASSERT(buffer_length_ >= new_buffer_length);
774     available = buffer_length_ - seek_position_;
775   }
776 
777   if (bytes > available) {
778     bytes = available;
779   }
780   memcpy(&buffer_[seek_position_], buffer, bytes);
781   seek_position_ += bytes;
782   if (data_length_ < seek_position_) {
783     data_length_ = seek_position_;
784   }
785   if (bytes_written) {
786     *bytes_written = bytes;
787   }
788   return SR_SUCCESS;
789 }
790 
Close()791 void MemoryStreamBase::Close() {
792   // nothing to do
793 }
794 
SetPosition(size_t position)795 bool MemoryStreamBase::SetPosition(size_t position) {
796   if (position > data_length_)
797     return false;
798   seek_position_ = position;
799   return true;
800 }
801 
GetPosition(size_t * position) const802 bool MemoryStreamBase::GetPosition(size_t* position) const {
803   if (position)
804     *position = seek_position_;
805   return true;
806 }
807 
GetSize(size_t * size) const808 bool MemoryStreamBase::GetSize(size_t* size) const {
809   if (size)
810     *size = data_length_;
811   return true;
812 }
813 
GetAvailable(size_t * size) const814 bool MemoryStreamBase::GetAvailable(size_t* size) const {
815   if (size)
816     *size = data_length_ - seek_position_;
817   return true;
818 }
819 
ReserveSize(size_t size)820 bool MemoryStreamBase::ReserveSize(size_t size) {
821   return (SR_SUCCESS == DoReserve(size, NULL));
822 }
823 
DoReserve(size_t size,int * error)824 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
825   return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
826 }
827 
828 ///////////////////////////////////////////////////////////////////////////////
829 
MemoryStream()830 MemoryStream::MemoryStream()
831   : buffer_alloc_(NULL) {
832 }
833 
MemoryStream(const char * data)834 MemoryStream::MemoryStream(const char* data)
835   : buffer_alloc_(NULL) {
836   SetData(data, strlen(data));
837 }
838 
MemoryStream(const void * data,size_t length)839 MemoryStream::MemoryStream(const void* data, size_t length)
840   : buffer_alloc_(NULL) {
841   SetData(data, length);
842 }
843 
~MemoryStream()844 MemoryStream::~MemoryStream() {
845   delete [] buffer_alloc_;
846 }
847 
SetData(const void * data,size_t length)848 void MemoryStream::SetData(const void* data, size_t length) {
849   data_length_ = buffer_length_ = length;
850   delete [] buffer_alloc_;
851   buffer_alloc_ = new char[buffer_length_ + kAlignment];
852   buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
853   memcpy(buffer_, data, data_length_);
854   seek_position_ = 0;
855 }
856 
DoReserve(size_t size,int * error)857 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
858   if (buffer_length_ >= size)
859     return SR_SUCCESS;
860 
861   if (char* new_buffer_alloc = new char[size + kAlignment]) {
862     char* new_buffer = reinterpret_cast<char*>(
863         ALIGNP(new_buffer_alloc, kAlignment));
864     memcpy(new_buffer, buffer_, data_length_);
865     delete [] buffer_alloc_;
866     buffer_alloc_ = new_buffer_alloc;
867     buffer_ = new_buffer;
868     buffer_length_ = size;
869     return SR_SUCCESS;
870   }
871 
872   if (error) {
873     *error = ENOMEM;
874   }
875   return SR_ERROR;
876 }
877 
878 ///////////////////////////////////////////////////////////////////////////////
879 
ExternalMemoryStream()880 ExternalMemoryStream::ExternalMemoryStream() {
881 }
882 
ExternalMemoryStream(void * data,size_t length)883 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
884   SetData(data, length);
885 }
886 
~ExternalMemoryStream()887 ExternalMemoryStream::~ExternalMemoryStream() {
888 }
889 
SetData(void * data,size_t length)890 void ExternalMemoryStream::SetData(void* data, size_t length) {
891   data_length_ = buffer_length_ = length;
892   buffer_ = static_cast<char*>(data);
893   seek_position_ = 0;
894 }
895 
896 ///////////////////////////////////////////////////////////////////////////////
897 // FifoBuffer
898 ///////////////////////////////////////////////////////////////////////////////
899 
FifoBuffer(size_t size)900 FifoBuffer::FifoBuffer(size_t size)
901     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
902       data_length_(0), read_position_(0), owner_(Thread::Current()) {
903   // all events are done on the owner_ thread
904 }
905 
FifoBuffer(size_t size,Thread * owner)906 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
907     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
908       data_length_(0), read_position_(0), owner_(owner) {
909   // all events are done on the owner_ thread
910 }
911 
~FifoBuffer()912 FifoBuffer::~FifoBuffer() {
913 }
914 
GetBuffered(size_t * size) const915 bool FifoBuffer::GetBuffered(size_t* size) const {
916   CritScope cs(&crit_);
917   *size = data_length_;
918   return true;
919 }
920 
SetCapacity(size_t size)921 bool FifoBuffer::SetCapacity(size_t size) {
922   CritScope cs(&crit_);
923   if (data_length_ > size) {
924     return false;
925   }
926 
927   if (size != buffer_length_) {
928     char* buffer = new char[size];
929     const size_t copy = data_length_;
930     const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
931     memcpy(buffer, &buffer_[read_position_], tail_copy);
932     memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
933     buffer_.reset(buffer);
934     read_position_ = 0;
935     buffer_length_ = size;
936   }
937   return true;
938 }
939 
ReadOffset(void * buffer,size_t bytes,size_t offset,size_t * bytes_read)940 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
941                                     size_t offset, size_t* bytes_read) {
942   CritScope cs(&crit_);
943   return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
944 }
945 
WriteOffset(const void * buffer,size_t bytes,size_t offset,size_t * bytes_written)946 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
947                                      size_t offset, size_t* bytes_written) {
948   CritScope cs(&crit_);
949   return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
950 }
951 
GetState() const952 StreamState FifoBuffer::GetState() const {
953   return state_;
954 }
955 
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)956 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
957                               size_t* bytes_read, int* error) {
958   CritScope cs(&crit_);
959   const bool was_writable = data_length_ < buffer_length_;
960   size_t copy = 0;
961   StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
962 
963   if (result == SR_SUCCESS) {
964     // If read was successful then adjust the read position and number of
965     // bytes buffered.
966     read_position_ = (read_position_ + copy) % buffer_length_;
967     data_length_ -= copy;
968     if (bytes_read) {
969       *bytes_read = copy;
970     }
971 
972     // if we were full before, and now we're not, post an event
973     if (!was_writable && copy > 0) {
974       PostEvent(owner_, SE_WRITE, 0);
975     }
976   }
977   return result;
978 }
979 
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)980 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
981                                size_t* bytes_written, int* error) {
982   CritScope cs(&crit_);
983 
984   const bool was_readable = (data_length_ > 0);
985   size_t copy = 0;
986   StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
987 
988   if (result == SR_SUCCESS) {
989     // If write was successful then adjust the number of readable bytes.
990     data_length_ += copy;
991     if (bytes_written) {
992       *bytes_written = copy;
993     }
994 
995     // if we didn't have any data to read before, and now we do, post an event
996     if (!was_readable && copy > 0) {
997       PostEvent(owner_, SE_READ, 0);
998     }
999   }
1000   return result;
1001 }
1002 
Close()1003 void FifoBuffer::Close() {
1004   CritScope cs(&crit_);
1005   state_ = SS_CLOSED;
1006 }
1007 
GetReadData(size_t * size)1008 const void* FifoBuffer::GetReadData(size_t* size) {
1009   CritScope cs(&crit_);
1010   *size = (read_position_ + data_length_ <= buffer_length_) ?
1011       data_length_ : buffer_length_ - read_position_;
1012   return &buffer_[read_position_];
1013 }
1014 
ConsumeReadData(size_t size)1015 void FifoBuffer::ConsumeReadData(size_t size) {
1016   CritScope cs(&crit_);
1017   ASSERT(size <= data_length_);
1018   const bool was_writable = data_length_ < buffer_length_;
1019   read_position_ = (read_position_ + size) % buffer_length_;
1020   data_length_ -= size;
1021   if (!was_writable && size > 0) {
1022     PostEvent(owner_, SE_WRITE, 0);
1023   }
1024 }
1025 
GetWriteBuffer(size_t * size)1026 void* FifoBuffer::GetWriteBuffer(size_t* size) {
1027   CritScope cs(&crit_);
1028   if (state_ == SS_CLOSED) {
1029     return NULL;
1030   }
1031 
1032   // if empty, reset the write position to the beginning, so we can get
1033   // the biggest possible block
1034   if (data_length_ == 0) {
1035     read_position_ = 0;
1036   }
1037 
1038   const size_t write_position = (read_position_ + data_length_)
1039       % buffer_length_;
1040   *size = (write_position > read_position_ || data_length_ == 0) ?
1041       buffer_length_ - write_position : read_position_ - write_position;
1042   return &buffer_[write_position];
1043 }
1044 
ConsumeWriteBuffer(size_t size)1045 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
1046   CritScope cs(&crit_);
1047   ASSERT(size <= buffer_length_ - data_length_);
1048   const bool was_readable = (data_length_ > 0);
1049   data_length_ += size;
1050   if (!was_readable && size > 0) {
1051     PostEvent(owner_, SE_READ, 0);
1052   }
1053 }
1054 
GetWriteRemaining(size_t * size) const1055 bool FifoBuffer::GetWriteRemaining(size_t* size) const {
1056   CritScope cs(&crit_);
1057   *size = buffer_length_ - data_length_;
1058   return true;
1059 }
1060 
ReadOffsetLocked(void * buffer,size_t bytes,size_t offset,size_t * bytes_read)1061 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
1062                                           size_t bytes,
1063                                           size_t offset,
1064                                           size_t* bytes_read) {
1065   if (offset >= data_length_) {
1066     return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
1067   }
1068 
1069   const size_t available = data_length_ - offset;
1070   const size_t read_position = (read_position_ + offset) % buffer_length_;
1071   const size_t copy = _min(bytes, available);
1072   const size_t tail_copy = _min(copy, buffer_length_ - read_position);
1073   char* const p = static_cast<char*>(buffer);
1074   memcpy(p, &buffer_[read_position], tail_copy);
1075   memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
1076 
1077   if (bytes_read) {
1078     *bytes_read = copy;
1079   }
1080   return SR_SUCCESS;
1081 }
1082 
WriteOffsetLocked(const void * buffer,size_t bytes,size_t offset,size_t * bytes_written)1083 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
1084                                            size_t bytes,
1085                                            size_t offset,
1086                                            size_t* bytes_written) {
1087   if (state_ == SS_CLOSED) {
1088     return SR_EOS;
1089   }
1090 
1091   if (data_length_ + offset >= buffer_length_) {
1092     return SR_BLOCK;
1093   }
1094 
1095   const size_t available = buffer_length_ - data_length_ - offset;
1096   const size_t write_position = (read_position_ + data_length_ + offset)
1097       % buffer_length_;
1098   const size_t copy = _min(bytes, available);
1099   const size_t tail_copy = _min(copy, buffer_length_ - write_position);
1100   const char* const p = static_cast<const char*>(buffer);
1101   memcpy(&buffer_[write_position], p, tail_copy);
1102   memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
1103 
1104   if (bytes_written) {
1105     *bytes_written = copy;
1106   }
1107   return SR_SUCCESS;
1108 }
1109 
1110 
1111 
1112 ///////////////////////////////////////////////////////////////////////////////
1113 // LoggingAdapter
1114 ///////////////////////////////////////////////////////////////////////////////
1115 
LoggingAdapter(StreamInterface * stream,LoggingSeverity level,const std::string & label,bool hex_mode)1116 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
1117                                const std::string& label, bool hex_mode)
1118     : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
1119   set_label(label);
1120 }
1121 
set_label(const std::string & label)1122 void LoggingAdapter::set_label(const std::string& label) {
1123   label_.assign("[");
1124   label_.append(label);
1125   label_.append("]");
1126 }
1127 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)1128 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
1129                                   size_t* read, int* error) {
1130   size_t local_read; if (!read) read = &local_read;
1131   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
1132                                                      error);
1133   if (result == SR_SUCCESS) {
1134     LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
1135   }
1136   return result;
1137 }
1138 
Write(const void * data,size_t data_len,size_t * written,int * error)1139 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
1140                                    size_t* written, int* error) {
1141   size_t local_written;
1142   if (!written) written = &local_written;
1143   StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
1144                                                       error);
1145   if (result == SR_SUCCESS) {
1146     LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
1147                  &lms_);
1148   }
1149   return result;
1150 }
1151 
Close()1152 void LoggingAdapter::Close() {
1153   LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1154   LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1155   LOG_V(level_) << label_ << " Closed locally";
1156   StreamAdapterInterface::Close();
1157 }
1158 
OnEvent(StreamInterface * stream,int events,int err)1159 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
1160   if (events & SE_OPEN) {
1161     LOG_V(level_) << label_ << " Open";
1162   } else if (events & SE_CLOSE) {
1163     LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1164     LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1165     LOG_V(level_) << label_ << " Closed with error: " << err;
1166   }
1167   StreamAdapterInterface::OnEvent(stream, events, err);
1168 }
1169 
1170 ///////////////////////////////////////////////////////////////////////////////
1171 // StringStream - Reads/Writes to an external std::string
1172 ///////////////////////////////////////////////////////////////////////////////
1173 
StringStream(std::string & str)1174 StringStream::StringStream(std::string& str)
1175     : str_(str), read_pos_(0), read_only_(false) {
1176 }
1177 
StringStream(const std::string & str)1178 StringStream::StringStream(const std::string& str)
1179     : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
1180 }
1181 
GetState() const1182 StreamState StringStream::GetState() const {
1183   return SS_OPEN;
1184 }
1185 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)1186 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
1187                                       size_t* read, int* error) {
1188   size_t available = _min(buffer_len, str_.size() - read_pos_);
1189   if (!available)
1190     return SR_EOS;
1191   memcpy(buffer, str_.data() + read_pos_, available);
1192   read_pos_ += available;
1193   if (read)
1194     *read = available;
1195   return SR_SUCCESS;
1196 }
1197 
Write(const void * data,size_t data_len,size_t * written,int * error)1198 StreamResult StringStream::Write(const void* data, size_t data_len,
1199                                       size_t* written, int* error) {
1200   if (read_only_) {
1201     if (error) {
1202       *error = -1;
1203     }
1204     return SR_ERROR;
1205   }
1206   str_.append(static_cast<const char*>(data),
1207               static_cast<const char*>(data) + data_len);
1208   if (written)
1209     *written = data_len;
1210   return SR_SUCCESS;
1211 }
1212 
Close()1213 void StringStream::Close() {
1214 }
1215 
SetPosition(size_t position)1216 bool StringStream::SetPosition(size_t position) {
1217   if (position > str_.size())
1218     return false;
1219   read_pos_ = position;
1220   return true;
1221 }
1222 
GetPosition(size_t * position) const1223 bool StringStream::GetPosition(size_t* position) const {
1224   if (position)
1225     *position = read_pos_;
1226   return true;
1227 }
1228 
GetSize(size_t * size) const1229 bool StringStream::GetSize(size_t* size) const {
1230   if (size)
1231     *size = str_.size();
1232   return true;
1233 }
1234 
GetAvailable(size_t * size) const1235 bool StringStream::GetAvailable(size_t* size) const {
1236   if (size)
1237     *size = str_.size() - read_pos_;
1238   return true;
1239 }
1240 
ReserveSize(size_t size)1241 bool StringStream::ReserveSize(size_t size) {
1242   if (read_only_)
1243     return false;
1244   str_.reserve(size);
1245   return true;
1246 }
1247 
1248 ///////////////////////////////////////////////////////////////////////////////
1249 // StreamReference
1250 ///////////////////////////////////////////////////////////////////////////////
1251 
StreamReference(StreamInterface * stream)1252 StreamReference::StreamReference(StreamInterface* stream)
1253     : StreamAdapterInterface(stream, false) {
1254   // owner set to false so the destructor does not free the stream.
1255   stream_ref_count_ = new StreamRefCount(stream);
1256 }
1257 
NewReference()1258 StreamInterface* StreamReference::NewReference() {
1259   stream_ref_count_->AddReference();
1260   return new StreamReference(stream_ref_count_, stream());
1261 }
1262 
~StreamReference()1263 StreamReference::~StreamReference() {
1264   stream_ref_count_->Release();
1265 }
1266 
StreamReference(StreamRefCount * stream_ref_count,StreamInterface * stream)1267 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1268                                  StreamInterface* stream)
1269     : StreamAdapterInterface(stream, false),
1270       stream_ref_count_(stream_ref_count) {
1271 }
1272 
1273 ///////////////////////////////////////////////////////////////////////////////
1274 
Flow(StreamInterface * source,char * buffer,size_t buffer_len,StreamInterface * sink,size_t * data_len)1275 StreamResult Flow(StreamInterface* source,
1276                   char* buffer, size_t buffer_len,
1277                   StreamInterface* sink,
1278                   size_t* data_len /* = NULL */) {
1279   ASSERT(buffer_len > 0);
1280 
1281   StreamResult result;
1282   size_t count, read_pos, write_pos;
1283   if (data_len) {
1284     read_pos = *data_len;
1285   } else {
1286     read_pos = 0;
1287   }
1288 
1289   bool end_of_stream = false;
1290   do {
1291     // Read until buffer is full, end of stream, or error
1292     while (!end_of_stream && (read_pos < buffer_len)) {
1293       result = source->Read(buffer + read_pos, buffer_len - read_pos,
1294                             &count, NULL);
1295       if (result == SR_EOS) {
1296         end_of_stream = true;
1297       } else if (result != SR_SUCCESS) {
1298         if (data_len) {
1299           *data_len = read_pos;
1300         }
1301         return result;
1302       } else {
1303         read_pos += count;
1304       }
1305     }
1306 
1307     // Write until buffer is empty, or error (including end of stream)
1308     write_pos = 0;
1309     while (write_pos < read_pos) {
1310       result = sink->Write(buffer + write_pos, read_pos - write_pos,
1311                            &count, NULL);
1312       if (result != SR_SUCCESS) {
1313         if (data_len) {
1314           *data_len = read_pos - write_pos;
1315           if (write_pos > 0) {
1316             memmove(buffer, buffer + write_pos, *data_len);
1317           }
1318         }
1319         return result;
1320       }
1321       write_pos += count;
1322     }
1323 
1324     read_pos = 0;
1325   } while (!end_of_stream);
1326 
1327   if (data_len) {
1328     *data_len = 0;
1329   }
1330   return SR_SUCCESS;
1331 }
1332 
1333 ///////////////////////////////////////////////////////////////////////////////
1334 
1335 }  // namespace rtc
1336