• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2011, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #if defined(POSIX)
29 #include <sys/file.h>
30 #endif  // POSIX
31 #include <sys/types.h>
32 #include <sys/stat.h>
33 #include <errno.h>
34 #include <string>
35 #include "talk/base/basictypes.h"
36 #include "talk/base/common.h"
37 #include "talk/base/messagequeue.h"
38 #include "talk/base/stream.h"
39 #include "talk/base/stringencode.h"
40 #include "talk/base/stringutils.h"
41 #include "talk/base/thread.h"
42 
43 #ifdef WIN32
44 #include "talk/base/win32.h"
45 #define fileno _fileno
46 #endif
47 
48 namespace talk_base {
49 
50 ///////////////////////////////////////////////////////////////////////////////
51 // StreamInterface
52 ///////////////////////////////////////////////////////////////////////////////
53 
54 enum {
55   MSG_POST_EVENT = 0xF1F1
56 };
57 
~StreamInterface()58 StreamInterface::~StreamInterface() {
59 }
60 
61 struct PostEventData : public MessageData {
62   int events, error;
PostEventDatatalk_base::PostEventData63   PostEventData(int ev, int er) : events(ev), error(er) { }
64 };
65 
WriteAll(const void * data,size_t data_len,size_t * written,int * error)66 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
67                                        size_t* written, int* error) {
68   StreamResult result = SR_SUCCESS;
69   size_t total_written = 0, current_written;
70   while (total_written < data_len) {
71     result = Write(static_cast<const char*>(data) + total_written,
72                    data_len - total_written, &current_written, error);
73     if (result != SR_SUCCESS)
74       break;
75     total_written += current_written;
76   }
77   if (written)
78     *written = total_written;
79   return result;
80 }
81 
ReadAll(void * buffer,size_t buffer_len,size_t * read,int * error)82 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
83                                       size_t* read, int* error) {
84   StreamResult result = SR_SUCCESS;
85   size_t total_read = 0, current_read;
86   while (total_read < buffer_len) {
87     result = Read(static_cast<char*>(buffer) + total_read,
88                   buffer_len - total_read, &current_read, error);
89     if (result != SR_SUCCESS)
90       break;
91     total_read += current_read;
92   }
93   if (read)
94     *read = total_read;
95   return result;
96 }
97 
ReadLine(std::string * line)98 StreamResult StreamInterface::ReadLine(std::string* line) {
99   line->clear();
100   StreamResult result = SR_SUCCESS;
101   while (true) {
102     char ch;
103     result = Read(&ch, sizeof(ch), NULL, NULL);
104     if (result != SR_SUCCESS) {
105       break;
106     }
107     if (ch == '\n') {
108       break;
109     }
110     line->push_back(ch);
111   }
112   if (!line->empty()) {   // give back the line we've collected so far with
113     result = SR_SUCCESS;  // a success code.  Otherwise return the last code
114   }
115   return result;
116 }
117 
PostEvent(Thread * t,int events,int err)118 void StreamInterface::PostEvent(Thread* t, int events, int err) {
119   t->Post(this, MSG_POST_EVENT, new PostEventData(events, err));
120 }
121 
PostEvent(int events,int err)122 void StreamInterface::PostEvent(int events, int err) {
123   PostEvent(Thread::Current(), events, err);
124 }
125 
StreamInterface()126 StreamInterface::StreamInterface() {
127 }
128 
OnMessage(Message * msg)129 void StreamInterface::OnMessage(Message* msg) {
130   if (MSG_POST_EVENT == msg->message_id) {
131     PostEventData* pe = static_cast<PostEventData*>(msg->pdata);
132     SignalEvent(this, pe->events, pe->error);
133     delete msg->pdata;
134   }
135 }
136 
137 ///////////////////////////////////////////////////////////////////////////////
138 // StreamAdapterInterface
139 ///////////////////////////////////////////////////////////////////////////////
140 
StreamAdapterInterface(StreamInterface * stream,bool owned)141 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
142                                                bool owned)
143     : stream_(stream), owned_(owned) {
144   if (NULL != stream_)
145     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
146 }
147 
Attach(StreamInterface * stream,bool owned)148 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
149   if (NULL != stream_)
150     stream_->SignalEvent.disconnect(this);
151   if (owned_)
152     delete stream_;
153   stream_ = stream;
154   owned_ = owned;
155   if (NULL != stream_)
156     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
157 }
158 
Detach()159 StreamInterface* StreamAdapterInterface::Detach() {
160   if (NULL != stream_)
161     stream_->SignalEvent.disconnect(this);
162   StreamInterface* stream = stream_;
163   stream_ = NULL;
164   return stream;
165 }
166 
~StreamAdapterInterface()167 StreamAdapterInterface::~StreamAdapterInterface() {
168   if (owned_)
169     delete stream_;
170 }
171 
172 ///////////////////////////////////////////////////////////////////////////////
173 // StreamTap
174 ///////////////////////////////////////////////////////////////////////////////
175 
StreamTap(StreamInterface * stream,StreamInterface * tap)176 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
177 : StreamAdapterInterface(stream), tap_(NULL), tap_result_(SR_SUCCESS),
178   tap_error_(0)
179 {
180   AttachTap(tap);
181 }
182 
AttachTap(StreamInterface * tap)183 void StreamTap::AttachTap(StreamInterface* tap) {
184   tap_.reset(tap);
185 }
186 
DetachTap()187 StreamInterface* StreamTap::DetachTap() {
188   return tap_.release();
189 }
190 
GetTapResult(int * error)191 StreamResult StreamTap::GetTapResult(int* error) {
192   if (error) {
193     *error = tap_error_;
194   }
195   return tap_result_;
196 }
197 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)198 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
199                              size_t* read, int* error) {
200   size_t backup_read;
201   if (!read) {
202     read = &backup_read;
203   }
204   StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
205                                                   read, error);
206   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
207     tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
208   }
209   return res;
210 }
211 
Write(const void * data,size_t data_len,size_t * written,int * error)212 StreamResult StreamTap::Write(const void* data, size_t data_len,
213                               size_t* written, int* error) {
214   size_t backup_written;
215   if (!written) {
216     written = &backup_written;
217   }
218   StreamResult res = StreamAdapterInterface::Write(data, data_len,
219                                                    written, error);
220   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
221     tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
222   }
223   return res;
224 }
225 
226 ///////////////////////////////////////////////////////////////////////////////
227 // StreamSegment
228 ///////////////////////////////////////////////////////////////////////////////
229 
StreamSegment(StreamInterface * stream)230 StreamSegment::StreamSegment(StreamInterface* stream)
231 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
232   length_(SIZE_UNKNOWN)
233 {
234   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
235   stream->GetPosition(&start_);
236 }
237 
StreamSegment(StreamInterface * stream,size_t length)238 StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
239 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
240   length_(length)
241 {
242   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
243   stream->GetPosition(&start_);
244 }
245 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)246 StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
247                                  size_t* read, int* error)
248 {
249   if (SIZE_UNKNOWN != length_) {
250     if (pos_ >= length_)
251       return SR_EOS;
252     buffer_len = _min(buffer_len, length_ - pos_);
253   }
254   size_t backup_read;
255   if (!read) {
256     read = &backup_read;
257   }
258   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
259                                                      read, error);
260   if (SR_SUCCESS == result) {
261     pos_ += *read;
262   }
263   return result;
264 }
265 
SetPosition(size_t position)266 bool StreamSegment::SetPosition(size_t position) {
267   if (SIZE_UNKNOWN == start_)
268     return false;  // Not seekable
269   if ((SIZE_UNKNOWN != length_) && (position > length_))
270     return false;  // Seek past end of segment
271   if (!StreamAdapterInterface::SetPosition(start_ + position))
272     return false;
273   pos_ = position;
274   return true;
275 }
276 
GetPosition(size_t * position) const277 bool StreamSegment::GetPosition(size_t* position) const {
278   if (SIZE_UNKNOWN == start_)
279     return false;  // Not seekable
280   if (!StreamAdapterInterface::GetPosition(position))
281     return false;
282   if (position) {
283     ASSERT(*position >= start_);
284     *position -= start_;
285   }
286   return true;
287 }
288 
GetSize(size_t * size) const289 bool StreamSegment::GetSize(size_t* size) const {
290   if (!StreamAdapterInterface::GetSize(size))
291     return false;
292   if (size) {
293     if (SIZE_UNKNOWN != start_) {
294       ASSERT(*size >= start_);
295       *size -= start_;
296     }
297     if (SIZE_UNKNOWN != length_) {
298       *size = _min(*size, length_);
299     }
300   }
301   return true;
302 }
303 
GetAvailable(size_t * size) const304 bool StreamSegment::GetAvailable(size_t* size) const {
305   if (!StreamAdapterInterface::GetAvailable(size))
306     return false;
307   if (size && (SIZE_UNKNOWN != length_))
308     *size = _min(*size, length_ - pos_);
309   return true;
310 }
311 
312 ///////////////////////////////////////////////////////////////////////////////
313 // NullStream
314 ///////////////////////////////////////////////////////////////////////////////
315 
NullStream()316 NullStream::NullStream() {
317 }
318 
~NullStream()319 NullStream::~NullStream() {
320 }
321 
GetState() const322 StreamState NullStream::GetState() const {
323   return SS_OPEN;
324 }
325 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)326 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
327                               size_t* read, int* error) {
328   if (error) *error = -1;
329   return SR_ERROR;
330 }
331 
Write(const void * data,size_t data_len,size_t * written,int * error)332 StreamResult NullStream::Write(const void* data, size_t data_len,
333                                size_t* written, int* error) {
334   if (written) *written = data_len;
335   return SR_SUCCESS;
336 }
337 
Close()338 void NullStream::Close() {
339 }
340 
341 ///////////////////////////////////////////////////////////////////////////////
342 // FileStream
343 ///////////////////////////////////////////////////////////////////////////////
344 
FileStream()345 FileStream::FileStream() : file_(NULL) {
346 }
347 
~FileStream()348 FileStream::~FileStream() {
349   FileStream::Close();
350 }
351 
Open(const std::string & filename,const char * mode)352 bool FileStream::Open(const std::string& filename, const char* mode) {
353   Close();
354 #ifdef WIN32
355   std::wstring wfilename;
356   if (Utf8ToWindowsFilename(filename, &wfilename)) {
357     file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
358   } else {
359     file_ = NULL;
360   }
361 #else
362   file_ = fopen(filename.c_str(), mode);
363 #endif
364   return (file_ != NULL);
365 }
366 
OpenShare(const std::string & filename,const char * mode,int shflag)367 bool FileStream::OpenShare(const std::string& filename, const char* mode,
368                            int shflag) {
369   Close();
370 #ifdef WIN32
371   std::wstring wfilename;
372   if (Utf8ToWindowsFilename(filename, &wfilename)) {
373     file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
374   } else {
375     file_ = NULL;
376   }
377 #else
378   return Open(filename, mode);
379 #endif
380   return (file_ != NULL);
381 }
382 
DisableBuffering()383 bool FileStream::DisableBuffering() {
384   if (!file_)
385     return false;
386   return (setvbuf(file_, NULL, _IONBF, 0) == 0);
387 }
388 
GetState() const389 StreamState FileStream::GetState() const {
390   return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
391 }
392 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)393 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
394                               size_t* read, int* error) {
395   if (!file_)
396     return SR_EOS;
397   size_t result = fread(buffer, 1, buffer_len, file_);
398   if ((result == 0) && (buffer_len > 0)) {
399     if (feof(file_))
400       return SR_EOS;
401     if (error)
402       *error = errno;
403     return SR_ERROR;
404   }
405   if (read)
406     *read = result;
407   return SR_SUCCESS;
408 }
409 
Write(const void * data,size_t data_len,size_t * written,int * error)410 StreamResult FileStream::Write(const void* data, size_t data_len,
411                                size_t* written, int* error) {
412   if (!file_)
413     return SR_EOS;
414   size_t result = fwrite(data, 1, data_len, file_);
415   if ((result == 0) && (data_len > 0)) {
416     if (error)
417       *error = errno;
418     return SR_ERROR;
419   }
420   if (written)
421     *written = result;
422   return SR_SUCCESS;
423 }
424 
Close()425 void FileStream::Close() {
426   if (file_) {
427     DoClose();
428     file_ = NULL;
429   }
430 }
431 
SetPosition(size_t position)432 bool FileStream::SetPosition(size_t position) {
433   if (!file_)
434     return false;
435   return (fseek(file_, position, SEEK_SET) == 0);
436 }
437 
GetPosition(size_t * position) const438 bool FileStream::GetPosition(size_t* position) const {
439   ASSERT(NULL != position);
440   if (!file_)
441     return false;
442   long result = ftell(file_);
443   if (result < 0)
444     return false;
445   if (position)
446     *position = result;
447   return true;
448 }
449 
GetSize(size_t * size) const450 bool FileStream::GetSize(size_t* size) const {
451   ASSERT(NULL != size);
452   if (!file_)
453     return false;
454   struct stat file_stats;
455   if (fstat(fileno(file_), &file_stats) != 0)
456     return false;
457   if (size)
458     *size = file_stats.st_size;
459   return true;
460 }
461 
GetAvailable(size_t * size) const462 bool FileStream::GetAvailable(size_t* size) const {
463   ASSERT(NULL != size);
464   if (!GetSize(size))
465     return false;
466   long result = ftell(file_);
467   if (result < 0)
468     return false;
469   if (size)
470     *size -= result;
471   return true;
472 }
473 
ReserveSize(size_t size)474 bool FileStream::ReserveSize(size_t size) {
475   // TODO: extend the file to the proper length
476   return true;
477 }
478 
GetSize(const std::string & filename,size_t * size)479 bool FileStream::GetSize(const std::string& filename, size_t* size) {
480   struct stat file_stats;
481   if (stat(filename.c_str(), &file_stats) != 0)
482     return false;
483   *size = file_stats.st_size;
484   return true;
485 }
486 
Flush()487 bool FileStream::Flush() {
488   if (file_) {
489     return (0 == fflush(file_));
490   }
491   // try to flush empty file?
492   ASSERT(false);
493   return false;
494 }
495 
496 #if defined(POSIX)
497 
TryLock()498 bool FileStream::TryLock() {
499   if (file_ == NULL) {
500     // Stream not open.
501     ASSERT(false);
502     return false;
503   }
504 
505   return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
506 }
507 
Unlock()508 bool FileStream::Unlock() {
509   if (file_ == NULL) {
510     // Stream not open.
511     ASSERT(false);
512     return false;
513   }
514 
515   return flock(fileno(file_), LOCK_UN) == 0;
516 }
517 
518 #endif
519 
DoClose()520 void FileStream::DoClose() {
521   fclose(file_);
522 }
523 
524 #ifdef POSIX
525 
526 // Have to identically rewrite the FileStream destructor or else it would call
527 // the base class's Close() instead of the sub-class's.
~POpenStream()528 POpenStream::~POpenStream() {
529   POpenStream::Close();
530 }
531 
Open(const std::string & subcommand,const char * mode)532 bool POpenStream::Open(const std::string& subcommand, const char* mode) {
533   Close();
534   file_ = popen(subcommand.c_str(), mode);
535   return file_ != NULL;
536 }
537 
OpenShare(const std::string & subcommand,const char * mode,int shflag)538 bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
539                             int shflag) {
540   return Open(subcommand, mode);
541 }
542 
DoClose()543 void POpenStream::DoClose() {
544   wait_status_ = pclose(file_);
545 }
546 
547 #endif
548 
549 ///////////////////////////////////////////////////////////////////////////////
550 // MemoryStream
551 ///////////////////////////////////////////////////////////////////////////////
552 
MemoryStreamBase()553 MemoryStreamBase::MemoryStreamBase()
554   : buffer_(NULL), buffer_length_(0), data_length_(0),
555     seek_position_(0) {
556 }
557 
GetState() const558 StreamState MemoryStreamBase::GetState() const {
559   return SS_OPEN;
560 }
561 
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)562 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
563                                     size_t* bytes_read, int* error) {
564   if (seek_position_ >= data_length_) {
565     return SR_EOS;
566   }
567   size_t available = data_length_ - seek_position_;
568   if (bytes > available) {
569     // Read partial buffer
570     bytes = available;
571   }
572   memcpy(buffer, &buffer_[seek_position_], bytes);
573   seek_position_ += bytes;
574   if (bytes_read) {
575     *bytes_read = bytes;
576   }
577   return SR_SUCCESS;
578 }
579 
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)580 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
581                                      size_t* bytes_written, int* error) {
582   size_t available = buffer_length_ - seek_position_;
583   if (0 == available) {
584     // Increase buffer size to the larger of:
585     // a) new position rounded up to next 256 bytes
586     // b) double the previous length
587     size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
588                                     buffer_length_ * 2);
589     StreamResult result = DoReserve(new_buffer_length, error);
590     if (SR_SUCCESS != result) {
591       return result;
592     }
593     ASSERT(buffer_length_ >= new_buffer_length);
594     available = buffer_length_ - seek_position_;
595   }
596 
597   if (bytes > available) {
598     bytes = available;
599   }
600   memcpy(&buffer_[seek_position_], buffer, bytes);
601   seek_position_ += bytes;
602   if (data_length_ < seek_position_) {
603     data_length_ = seek_position_;
604   }
605   if (bytes_written) {
606     *bytes_written = bytes;
607   }
608   return SR_SUCCESS;
609 }
610 
Close()611 void MemoryStreamBase::Close() {
612   // nothing to do
613 }
614 
SetPosition(size_t position)615 bool MemoryStreamBase::SetPosition(size_t position) {
616   if (position > data_length_)
617     return false;
618   seek_position_ = position;
619   return true;
620 }
621 
GetPosition(size_t * position) const622 bool MemoryStreamBase::GetPosition(size_t *position) const {
623   if (position)
624     *position = seek_position_;
625   return true;
626 }
627 
GetSize(size_t * size) const628 bool MemoryStreamBase::GetSize(size_t *size) const {
629   if (size)
630     *size = data_length_;
631   return true;
632 }
633 
GetAvailable(size_t * size) const634 bool MemoryStreamBase::GetAvailable(size_t *size) const {
635   if (size)
636     *size = data_length_ - seek_position_;
637   return true;
638 }
639 
ReserveSize(size_t size)640 bool MemoryStreamBase::ReserveSize(size_t size) {
641   return (SR_SUCCESS == DoReserve(size, NULL));
642 }
643 
DoReserve(size_t size,int * error)644 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
645   return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
646 }
647 
648 ///////////////////////////////////////////////////////////////////////////////
649 
MemoryStream()650 MemoryStream::MemoryStream()
651   : buffer_alloc_(NULL) {
652 }
653 
MemoryStream(const char * data)654 MemoryStream::MemoryStream(const char* data)
655   : buffer_alloc_(NULL) {
656   SetData(data, strlen(data));
657 }
658 
MemoryStream(const void * data,size_t length)659 MemoryStream::MemoryStream(const void* data, size_t length)
660   : buffer_alloc_(NULL) {
661   SetData(data, length);
662 }
663 
~MemoryStream()664 MemoryStream::~MemoryStream() {
665   delete [] buffer_alloc_;
666 }
667 
SetData(const void * data,size_t length)668 void MemoryStream::SetData(const void* data, size_t length) {
669   data_length_ = buffer_length_ = length;
670   delete [] buffer_alloc_;
671   buffer_alloc_ = new char[buffer_length_ + kAlignment];
672   buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
673   memcpy(buffer_, data, data_length_);
674   seek_position_ = 0;
675 }
676 
DoReserve(size_t size,int * error)677 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
678   if (buffer_length_ >= size)
679     return SR_SUCCESS;
680 
681   if (char* new_buffer_alloc = new char[size + kAlignment]) {
682     char* new_buffer = reinterpret_cast<char*>(
683         ALIGNP(new_buffer_alloc, kAlignment));
684     memcpy(new_buffer, buffer_, data_length_);
685     delete [] buffer_alloc_;
686     buffer_alloc_ = new_buffer_alloc;
687     buffer_ = new_buffer;
688     buffer_length_ = size;
689     return SR_SUCCESS;
690   }
691 
692   if (error) {
693     *error = ENOMEM;
694   }
695   return SR_ERROR;
696 }
697 
698 ///////////////////////////////////////////////////////////////////////////////
699 
ExternalMemoryStream()700 ExternalMemoryStream::ExternalMemoryStream() {
701 }
702 
ExternalMemoryStream(void * data,size_t length)703 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
704   SetData(data, length);
705 }
706 
~ExternalMemoryStream()707 ExternalMemoryStream::~ExternalMemoryStream() {
708 }
709 
SetData(void * data,size_t length)710 void ExternalMemoryStream::SetData(void* data, size_t length) {
711   data_length_ = buffer_length_ = length;
712   buffer_ = static_cast<char*>(data);
713   seek_position_ = 0;
714 }
715 
716 ///////////////////////////////////////////////////////////////////////////////
717 // FifoBuffer
718 ///////////////////////////////////////////////////////////////////////////////
719 
FifoBuffer(size_t size)720 FifoBuffer::FifoBuffer(size_t size)
721     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
722       data_length_(0), read_position_(0), owner_(Thread::Current()) {
723   // all events are done on the owner_ thread
724 }
725 
~FifoBuffer()726 FifoBuffer::~FifoBuffer() {
727 }
728 
GetBuffered(size_t * size) const729 bool FifoBuffer::GetBuffered(size_t* size) const {
730   CritScope cs(&crit_);
731   *size = data_length_;
732   return true;
733 }
734 
SetCapacity(size_t size)735 bool FifoBuffer::SetCapacity(size_t size) {
736   CritScope cs(&crit_);
737   if (data_length_ > size) {
738     return false;
739   }
740 
741   if (size != buffer_length_) {
742     char* buffer = new char[size];
743     const size_t copy = data_length_;
744     const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
745     memcpy(buffer, &buffer_[read_position_], tail_copy);
746     memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
747     buffer_.reset(buffer);
748     read_position_ = 0;
749     buffer_length_ = size;
750   }
751   return true;
752 }
753 
GetState() const754 StreamState FifoBuffer::GetState() const {
755   return state_;
756 }
757 
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)758 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
759                               size_t* bytes_read, int* error) {
760   CritScope cs(&crit_);
761   const size_t available = data_length_;
762   if (0 == available) {
763     return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
764   }
765 
766   const bool was_writable = data_length_ < buffer_length_;
767   const size_t copy = _min(bytes, available);
768   const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
769   char* const p = static_cast<char*>(buffer);
770   memcpy(p, &buffer_[read_position_], tail_copy);
771   memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
772   read_position_ = (read_position_ + copy) % buffer_length_;
773   data_length_ -= copy;
774   if (bytes_read) {
775     *bytes_read = copy;
776   }
777   // if we were full before, and now we're not, post an event
778   if (!was_writable && copy > 0) {
779     PostEvent(owner_, SE_WRITE, 0);
780   }
781 
782   return SR_SUCCESS;
783 }
784 
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)785 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
786                                size_t* bytes_written, int* error) {
787   CritScope cs(&crit_);
788   if (state_ == SS_CLOSED) {
789     return SR_EOS;
790   }
791 
792   const size_t available = buffer_length_ - data_length_;
793   if (0 == available) {
794     return SR_BLOCK;
795   }
796 
797   const bool was_readable = (data_length_ > 0);
798   const size_t write_position = (read_position_ + data_length_)
799       % buffer_length_;
800   const size_t copy = _min(bytes, available);
801   const size_t tail_copy = _min(copy, buffer_length_ - write_position);
802   const char* const p = static_cast<const char*>(buffer);
803   memcpy(&buffer_[write_position], p, tail_copy);
804   memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
805   data_length_ += copy;
806   if (bytes_written) {
807     *bytes_written = copy;
808   }
809   // if we didn't have any data to read before, and now we do, post an event
810   if (!was_readable && copy > 0) {
811     PostEvent(owner_, SE_READ, 0);
812   }
813 
814   return SR_SUCCESS;
815 }
816 
Close()817 void FifoBuffer::Close() {
818   CritScope cs(&crit_);
819   state_ = SS_CLOSED;
820 }
821 
GetReadData(size_t * size)822 const void* FifoBuffer::GetReadData(size_t* size) {
823   CritScope cs(&crit_);
824   *size = (read_position_ + data_length_ <= buffer_length_) ?
825       data_length_ : buffer_length_ - read_position_;
826   return &buffer_[read_position_];
827 }
828 
ConsumeReadData(size_t size)829 void FifoBuffer::ConsumeReadData(size_t size) {
830   CritScope cs(&crit_);
831   ASSERT(size <= data_length_);
832   const bool was_writable = data_length_ < buffer_length_;
833   read_position_ = (read_position_ + size) % buffer_length_;
834   data_length_ -= size;
835   if (!was_writable && size > 0) {
836     PostEvent(owner_, SE_WRITE, 0);
837   }
838 }
839 
GetWriteBuffer(size_t * size)840 void* FifoBuffer::GetWriteBuffer(size_t* size) {
841   CritScope cs(&crit_);
842   if (state_ == SS_CLOSED) {
843     return NULL;
844   }
845 
846   // if empty, reset the write position to the beginning, so we can get
847   // the biggest possible block
848   if (data_length_ == 0) {
849     read_position_ = 0;
850   }
851 
852   const size_t write_position = (read_position_ + data_length_)
853       % buffer_length_;
854   *size = (write_position >= read_position_) ?
855       buffer_length_ - write_position : read_position_ - write_position;
856   return &buffer_[write_position];
857 }
858 
ConsumeWriteBuffer(size_t size)859 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
860   CritScope cs(&crit_);
861   ASSERT(size <= buffer_length_ - data_length_);
862   const bool was_readable = (data_length_ > 0);
863   data_length_ += size;
864   if (!was_readable && size > 0) {
865     PostEvent(owner_, SE_READ, 0);
866   }
867 }
868 
869 ///////////////////////////////////////////////////////////////////////////////
870 // LoggingAdapter
871 ///////////////////////////////////////////////////////////////////////////////
872 
LoggingAdapter(StreamInterface * stream,LoggingSeverity level,const std::string & label,bool hex_mode)873 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
874                                const std::string& label, bool hex_mode)
875 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode)
876 {
877   set_label(label);
878 }
879 
set_label(const std::string & label)880 void LoggingAdapter::set_label(const std::string& label) {
881   label_.assign("[");
882   label_.append(label);
883   label_.append("]");
884 }
885 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)886 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
887                                   size_t* read, int* error) {
888   size_t local_read; if (!read) read = &local_read;
889   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
890                                                      error);
891   if (result == SR_SUCCESS) {
892     LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
893   }
894   return result;
895 }
896 
Write(const void * data,size_t data_len,size_t * written,int * error)897 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
898                                    size_t* written, int* error) {
899   size_t local_written; if (!written) written = &local_written;
900   StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
901                                                       error);
902   if (result == SR_SUCCESS) {
903     LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
904                  &lms_);
905   }
906   return result;
907 }
908 
Close()909 void LoggingAdapter::Close() {
910   LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
911   LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
912   LOG_V(level_) << label_ << " Closed locally";
913   StreamAdapterInterface::Close();
914 }
915 
OnEvent(StreamInterface * stream,int events,int err)916 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
917   if (events & SE_OPEN) {
918     LOG_V(level_) << label_ << " Open";
919   } else if (events & SE_CLOSE) {
920     LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
921     LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
922     LOG_V(level_) << label_ << " Closed with error: " << err;
923   }
924   StreamAdapterInterface::OnEvent(stream, events, err);
925 }
926 
927 ///////////////////////////////////////////////////////////////////////////////
928 // StringStream - Reads/Writes to an external std::string
929 ///////////////////////////////////////////////////////////////////////////////
930 
StringStream(std::string & str)931 StringStream::StringStream(std::string& str)
932 : str_(str), read_pos_(0), read_only_(false)
933 {
934 }
935 
StringStream(const std::string & str)936 StringStream::StringStream(const std::string& str)
937 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true)
938 {
939 }
940 
GetState() const941 StreamState StringStream::GetState() const {
942   return SS_OPEN;
943 }
944 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)945 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
946                                       size_t* read, int* error) {
947   size_t available = _min(buffer_len, str_.size() - read_pos_);
948   if (!available)
949     return SR_EOS;
950   memcpy(buffer, str_.data() + read_pos_, available);
951   read_pos_ += available;
952   if (read)
953     *read = available;
954   return SR_SUCCESS;
955 }
956 
Write(const void * data,size_t data_len,size_t * written,int * error)957 StreamResult StringStream::Write(const void* data, size_t data_len,
958                                       size_t* written, int* error) {
959   if (read_only_) {
960     if (error) {
961       *error = -1;
962     }
963     return SR_ERROR;
964   }
965   str_.append(static_cast<const char*>(data),
966               static_cast<const char*>(data) + data_len);
967   if (written)
968     *written = data_len;
969   return SR_SUCCESS;
970 }
971 
Close()972 void StringStream::Close() {
973 }
974 
SetPosition(size_t position)975 bool StringStream::SetPosition(size_t position) {
976   if (position > str_.size())
977     return false;
978   read_pos_ = position;
979   return true;
980 }
981 
GetPosition(size_t * position) const982 bool StringStream::GetPosition(size_t* position) const {
983   if (position)
984     *position = read_pos_;
985   return true;
986 }
987 
GetSize(size_t * size) const988 bool StringStream::GetSize(size_t* size) const {
989   if (size)
990     *size = str_.size();
991   return true;
992 }
993 
GetAvailable(size_t * size) const994 bool StringStream::GetAvailable(size_t* size) const {
995   if (size)
996     *size = str_.size() - read_pos_;
997   return true;
998 }
999 
ReserveSize(size_t size)1000 bool StringStream::ReserveSize(size_t size) {
1001   if (read_only_)
1002     return false;
1003   str_.reserve(size);
1004   return true;
1005 }
1006 
1007 ///////////////////////////////////////////////////////////////////////////////
1008 // StreamReference
1009 ///////////////////////////////////////////////////////////////////////////////
1010 
StreamReference(StreamInterface * stream)1011 StreamReference::StreamReference(StreamInterface* stream)
1012     : StreamAdapterInterface(stream, false) {
1013   // owner set to false so the destructor does not free the stream.
1014   stream_ref_count_ = new StreamRefCount(stream);
1015 }
1016 
NewReference()1017 StreamInterface* StreamReference::NewReference() {
1018   stream_ref_count_->AddReference();
1019   return new StreamReference(stream_ref_count_, stream());
1020 }
1021 
~StreamReference()1022 StreamReference::~StreamReference() {
1023   stream_ref_count_->Release();
1024 }
1025 
StreamReference(StreamRefCount * stream_ref_count,StreamInterface * stream)1026 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1027                                  StreamInterface* stream)
1028     : StreamAdapterInterface(stream, false),
1029       stream_ref_count_(stream_ref_count) {
1030 }
1031 
1032 ///////////////////////////////////////////////////////////////////////////////
1033 
Flow(StreamInterface * source,char * buffer,size_t buffer_len,StreamInterface * sink,size_t * data_len)1034 StreamResult Flow(StreamInterface* source,
1035                   char* buffer, size_t buffer_len,
1036                   StreamInterface* sink,
1037                   size_t* data_len /* = NULL */) {
1038   ASSERT(buffer_len > 0);
1039 
1040   StreamResult result;
1041   size_t count, read_pos, write_pos;
1042   if (data_len) {
1043     read_pos = *data_len;
1044   } else {
1045     read_pos = 0;
1046   }
1047 
1048   bool end_of_stream = false;
1049   do {
1050     // Read until buffer is full, end of stream, or error
1051     while (!end_of_stream && (read_pos < buffer_len)) {
1052       result = source->Read(buffer + read_pos, buffer_len - read_pos,
1053                             &count, NULL);
1054       if (result == SR_EOS) {
1055         end_of_stream = true;
1056       } else if (result != SR_SUCCESS) {
1057         if (data_len) {
1058           *data_len = read_pos;
1059         }
1060         return result;
1061       } else {
1062         read_pos += count;
1063       }
1064     }
1065 
1066     // Write until buffer is empty, or error (including end of stream)
1067     write_pos = 0;
1068     while (write_pos < read_pos) {
1069       result = sink->Write(buffer + write_pos, read_pos - write_pos,
1070                            &count, NULL);
1071       if (result != SR_SUCCESS) {
1072         if (data_len) {
1073           *data_len = read_pos - write_pos;
1074           if (write_pos > 0) {
1075             memmove(buffer, buffer + write_pos, *data_len);
1076           }
1077         }
1078         return result;
1079       }
1080       write_pos += count;
1081     }
1082 
1083     read_pos = 0;
1084   } while (!end_of_stream);
1085 
1086   if (data_len) {
1087     *data_len = 0;
1088   }
1089   return SR_SUCCESS;
1090 }
1091 
1092 ///////////////////////////////////////////////////////////////////////////////
1093 
1094 } // namespace talk_base
1095