1 /*
2 * libjingle
3 * Copyright 2011, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #if defined(POSIX)
29 #include <sys/file.h>
30 #endif // POSIX
31 #include <sys/types.h>
32 #include <sys/stat.h>
33 #include <errno.h>
34 #include <string>
35 #include "talk/base/basictypes.h"
36 #include "talk/base/common.h"
37 #include "talk/base/messagequeue.h"
38 #include "talk/base/stream.h"
39 #include "talk/base/stringencode.h"
40 #include "talk/base/stringutils.h"
41 #include "talk/base/thread.h"
42
43 #ifdef WIN32
44 #include "talk/base/win32.h"
45 #define fileno _fileno
46 #endif
47
48 namespace talk_base {
49
50 ///////////////////////////////////////////////////////////////////////////////
51 // StreamInterface
52 ///////////////////////////////////////////////////////////////////////////////
53
54 enum {
55 MSG_POST_EVENT = 0xF1F1
56 };
57
~StreamInterface()58 StreamInterface::~StreamInterface() {
59 }
60
61 struct PostEventData : public MessageData {
62 int events, error;
PostEventDatatalk_base::PostEventData63 PostEventData(int ev, int er) : events(ev), error(er) { }
64 };
65
WriteAll(const void * data,size_t data_len,size_t * written,int * error)66 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
67 size_t* written, int* error) {
68 StreamResult result = SR_SUCCESS;
69 size_t total_written = 0, current_written;
70 while (total_written < data_len) {
71 result = Write(static_cast<const char*>(data) + total_written,
72 data_len - total_written, ¤t_written, error);
73 if (result != SR_SUCCESS)
74 break;
75 total_written += current_written;
76 }
77 if (written)
78 *written = total_written;
79 return result;
80 }
81
ReadAll(void * buffer,size_t buffer_len,size_t * read,int * error)82 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
83 size_t* read, int* error) {
84 StreamResult result = SR_SUCCESS;
85 size_t total_read = 0, current_read;
86 while (total_read < buffer_len) {
87 result = Read(static_cast<char*>(buffer) + total_read,
88 buffer_len - total_read, ¤t_read, error);
89 if (result != SR_SUCCESS)
90 break;
91 total_read += current_read;
92 }
93 if (read)
94 *read = total_read;
95 return result;
96 }
97
ReadLine(std::string * line)98 StreamResult StreamInterface::ReadLine(std::string* line) {
99 line->clear();
100 StreamResult result = SR_SUCCESS;
101 while (true) {
102 char ch;
103 result = Read(&ch, sizeof(ch), NULL, NULL);
104 if (result != SR_SUCCESS) {
105 break;
106 }
107 if (ch == '\n') {
108 break;
109 }
110 line->push_back(ch);
111 }
112 if (!line->empty()) { // give back the line we've collected so far with
113 result = SR_SUCCESS; // a success code. Otherwise return the last code
114 }
115 return result;
116 }
117
PostEvent(Thread * t,int events,int err)118 void StreamInterface::PostEvent(Thread* t, int events, int err) {
119 t->Post(this, MSG_POST_EVENT, new PostEventData(events, err));
120 }
121
PostEvent(int events,int err)122 void StreamInterface::PostEvent(int events, int err) {
123 PostEvent(Thread::Current(), events, err);
124 }
125
StreamInterface()126 StreamInterface::StreamInterface() {
127 }
128
OnMessage(Message * msg)129 void StreamInterface::OnMessage(Message* msg) {
130 if (MSG_POST_EVENT == msg->message_id) {
131 PostEventData* pe = static_cast<PostEventData*>(msg->pdata);
132 SignalEvent(this, pe->events, pe->error);
133 delete msg->pdata;
134 }
135 }
136
137 ///////////////////////////////////////////////////////////////////////////////
138 // StreamAdapterInterface
139 ///////////////////////////////////////////////////////////////////////////////
140
StreamAdapterInterface(StreamInterface * stream,bool owned)141 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
142 bool owned)
143 : stream_(stream), owned_(owned) {
144 if (NULL != stream_)
145 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
146 }
147
Attach(StreamInterface * stream,bool owned)148 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
149 if (NULL != stream_)
150 stream_->SignalEvent.disconnect(this);
151 if (owned_)
152 delete stream_;
153 stream_ = stream;
154 owned_ = owned;
155 if (NULL != stream_)
156 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
157 }
158
Detach()159 StreamInterface* StreamAdapterInterface::Detach() {
160 if (NULL != stream_)
161 stream_->SignalEvent.disconnect(this);
162 StreamInterface* stream = stream_;
163 stream_ = NULL;
164 return stream;
165 }
166
~StreamAdapterInterface()167 StreamAdapterInterface::~StreamAdapterInterface() {
168 if (owned_)
169 delete stream_;
170 }
171
172 ///////////////////////////////////////////////////////////////////////////////
173 // StreamTap
174 ///////////////////////////////////////////////////////////////////////////////
175
StreamTap(StreamInterface * stream,StreamInterface * tap)176 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
177 : StreamAdapterInterface(stream), tap_(NULL), tap_result_(SR_SUCCESS),
178 tap_error_(0)
179 {
180 AttachTap(tap);
181 }
182
AttachTap(StreamInterface * tap)183 void StreamTap::AttachTap(StreamInterface* tap) {
184 tap_.reset(tap);
185 }
186
DetachTap()187 StreamInterface* StreamTap::DetachTap() {
188 return tap_.release();
189 }
190
GetTapResult(int * error)191 StreamResult StreamTap::GetTapResult(int* error) {
192 if (error) {
193 *error = tap_error_;
194 }
195 return tap_result_;
196 }
197
Read(void * buffer,size_t buffer_len,size_t * read,int * error)198 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
199 size_t* read, int* error) {
200 size_t backup_read;
201 if (!read) {
202 read = &backup_read;
203 }
204 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
205 read, error);
206 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
207 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
208 }
209 return res;
210 }
211
Write(const void * data,size_t data_len,size_t * written,int * error)212 StreamResult StreamTap::Write(const void* data, size_t data_len,
213 size_t* written, int* error) {
214 size_t backup_written;
215 if (!written) {
216 written = &backup_written;
217 }
218 StreamResult res = StreamAdapterInterface::Write(data, data_len,
219 written, error);
220 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
221 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
222 }
223 return res;
224 }
225
226 ///////////////////////////////////////////////////////////////////////////////
227 // StreamSegment
228 ///////////////////////////////////////////////////////////////////////////////
229
StreamSegment(StreamInterface * stream)230 StreamSegment::StreamSegment(StreamInterface* stream)
231 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
232 length_(SIZE_UNKNOWN)
233 {
234 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
235 stream->GetPosition(&start_);
236 }
237
StreamSegment(StreamInterface * stream,size_t length)238 StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
239 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
240 length_(length)
241 {
242 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
243 stream->GetPosition(&start_);
244 }
245
Read(void * buffer,size_t buffer_len,size_t * read,int * error)246 StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
247 size_t* read, int* error)
248 {
249 if (SIZE_UNKNOWN != length_) {
250 if (pos_ >= length_)
251 return SR_EOS;
252 buffer_len = _min(buffer_len, length_ - pos_);
253 }
254 size_t backup_read;
255 if (!read) {
256 read = &backup_read;
257 }
258 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
259 read, error);
260 if (SR_SUCCESS == result) {
261 pos_ += *read;
262 }
263 return result;
264 }
265
SetPosition(size_t position)266 bool StreamSegment::SetPosition(size_t position) {
267 if (SIZE_UNKNOWN == start_)
268 return false; // Not seekable
269 if ((SIZE_UNKNOWN != length_) && (position > length_))
270 return false; // Seek past end of segment
271 if (!StreamAdapterInterface::SetPosition(start_ + position))
272 return false;
273 pos_ = position;
274 return true;
275 }
276
GetPosition(size_t * position) const277 bool StreamSegment::GetPosition(size_t* position) const {
278 if (SIZE_UNKNOWN == start_)
279 return false; // Not seekable
280 if (!StreamAdapterInterface::GetPosition(position))
281 return false;
282 if (position) {
283 ASSERT(*position >= start_);
284 *position -= start_;
285 }
286 return true;
287 }
288
GetSize(size_t * size) const289 bool StreamSegment::GetSize(size_t* size) const {
290 if (!StreamAdapterInterface::GetSize(size))
291 return false;
292 if (size) {
293 if (SIZE_UNKNOWN != start_) {
294 ASSERT(*size >= start_);
295 *size -= start_;
296 }
297 if (SIZE_UNKNOWN != length_) {
298 *size = _min(*size, length_);
299 }
300 }
301 return true;
302 }
303
GetAvailable(size_t * size) const304 bool StreamSegment::GetAvailable(size_t* size) const {
305 if (!StreamAdapterInterface::GetAvailable(size))
306 return false;
307 if (size && (SIZE_UNKNOWN != length_))
308 *size = _min(*size, length_ - pos_);
309 return true;
310 }
311
312 ///////////////////////////////////////////////////////////////////////////////
313 // NullStream
314 ///////////////////////////////////////////////////////////////////////////////
315
NullStream()316 NullStream::NullStream() {
317 }
318
~NullStream()319 NullStream::~NullStream() {
320 }
321
GetState() const322 StreamState NullStream::GetState() const {
323 return SS_OPEN;
324 }
325
Read(void * buffer,size_t buffer_len,size_t * read,int * error)326 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
327 size_t* read, int* error) {
328 if (error) *error = -1;
329 return SR_ERROR;
330 }
331
Write(const void * data,size_t data_len,size_t * written,int * error)332 StreamResult NullStream::Write(const void* data, size_t data_len,
333 size_t* written, int* error) {
334 if (written) *written = data_len;
335 return SR_SUCCESS;
336 }
337
Close()338 void NullStream::Close() {
339 }
340
341 ///////////////////////////////////////////////////////////////////////////////
342 // FileStream
343 ///////////////////////////////////////////////////////////////////////////////
344
FileStream()345 FileStream::FileStream() : file_(NULL) {
346 }
347
~FileStream()348 FileStream::~FileStream() {
349 FileStream::Close();
350 }
351
Open(const std::string & filename,const char * mode)352 bool FileStream::Open(const std::string& filename, const char* mode) {
353 Close();
354 #ifdef WIN32
355 std::wstring wfilename;
356 if (Utf8ToWindowsFilename(filename, &wfilename)) {
357 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
358 } else {
359 file_ = NULL;
360 }
361 #else
362 file_ = fopen(filename.c_str(), mode);
363 #endif
364 return (file_ != NULL);
365 }
366
OpenShare(const std::string & filename,const char * mode,int shflag)367 bool FileStream::OpenShare(const std::string& filename, const char* mode,
368 int shflag) {
369 Close();
370 #ifdef WIN32
371 std::wstring wfilename;
372 if (Utf8ToWindowsFilename(filename, &wfilename)) {
373 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
374 } else {
375 file_ = NULL;
376 }
377 #else
378 return Open(filename, mode);
379 #endif
380 return (file_ != NULL);
381 }
382
DisableBuffering()383 bool FileStream::DisableBuffering() {
384 if (!file_)
385 return false;
386 return (setvbuf(file_, NULL, _IONBF, 0) == 0);
387 }
388
GetState() const389 StreamState FileStream::GetState() const {
390 return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
391 }
392
Read(void * buffer,size_t buffer_len,size_t * read,int * error)393 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
394 size_t* read, int* error) {
395 if (!file_)
396 return SR_EOS;
397 size_t result = fread(buffer, 1, buffer_len, file_);
398 if ((result == 0) && (buffer_len > 0)) {
399 if (feof(file_))
400 return SR_EOS;
401 if (error)
402 *error = errno;
403 return SR_ERROR;
404 }
405 if (read)
406 *read = result;
407 return SR_SUCCESS;
408 }
409
Write(const void * data,size_t data_len,size_t * written,int * error)410 StreamResult FileStream::Write(const void* data, size_t data_len,
411 size_t* written, int* error) {
412 if (!file_)
413 return SR_EOS;
414 size_t result = fwrite(data, 1, data_len, file_);
415 if ((result == 0) && (data_len > 0)) {
416 if (error)
417 *error = errno;
418 return SR_ERROR;
419 }
420 if (written)
421 *written = result;
422 return SR_SUCCESS;
423 }
424
Close()425 void FileStream::Close() {
426 if (file_) {
427 DoClose();
428 file_ = NULL;
429 }
430 }
431
SetPosition(size_t position)432 bool FileStream::SetPosition(size_t position) {
433 if (!file_)
434 return false;
435 return (fseek(file_, position, SEEK_SET) == 0);
436 }
437
GetPosition(size_t * position) const438 bool FileStream::GetPosition(size_t* position) const {
439 ASSERT(NULL != position);
440 if (!file_)
441 return false;
442 long result = ftell(file_);
443 if (result < 0)
444 return false;
445 if (position)
446 *position = result;
447 return true;
448 }
449
GetSize(size_t * size) const450 bool FileStream::GetSize(size_t* size) const {
451 ASSERT(NULL != size);
452 if (!file_)
453 return false;
454 struct stat file_stats;
455 if (fstat(fileno(file_), &file_stats) != 0)
456 return false;
457 if (size)
458 *size = file_stats.st_size;
459 return true;
460 }
461
GetAvailable(size_t * size) const462 bool FileStream::GetAvailable(size_t* size) const {
463 ASSERT(NULL != size);
464 if (!GetSize(size))
465 return false;
466 long result = ftell(file_);
467 if (result < 0)
468 return false;
469 if (size)
470 *size -= result;
471 return true;
472 }
473
ReserveSize(size_t size)474 bool FileStream::ReserveSize(size_t size) {
475 // TODO: extend the file to the proper length
476 return true;
477 }
478
GetSize(const std::string & filename,size_t * size)479 bool FileStream::GetSize(const std::string& filename, size_t* size) {
480 struct stat file_stats;
481 if (stat(filename.c_str(), &file_stats) != 0)
482 return false;
483 *size = file_stats.st_size;
484 return true;
485 }
486
Flush()487 bool FileStream::Flush() {
488 if (file_) {
489 return (0 == fflush(file_));
490 }
491 // try to flush empty file?
492 ASSERT(false);
493 return false;
494 }
495
496 #if defined(POSIX)
497
TryLock()498 bool FileStream::TryLock() {
499 if (file_ == NULL) {
500 // Stream not open.
501 ASSERT(false);
502 return false;
503 }
504
505 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
506 }
507
Unlock()508 bool FileStream::Unlock() {
509 if (file_ == NULL) {
510 // Stream not open.
511 ASSERT(false);
512 return false;
513 }
514
515 return flock(fileno(file_), LOCK_UN) == 0;
516 }
517
518 #endif
519
DoClose()520 void FileStream::DoClose() {
521 fclose(file_);
522 }
523
524 #ifdef POSIX
525
526 // Have to identically rewrite the FileStream destructor or else it would call
527 // the base class's Close() instead of the sub-class's.
~POpenStream()528 POpenStream::~POpenStream() {
529 POpenStream::Close();
530 }
531
Open(const std::string & subcommand,const char * mode)532 bool POpenStream::Open(const std::string& subcommand, const char* mode) {
533 Close();
534 file_ = popen(subcommand.c_str(), mode);
535 return file_ != NULL;
536 }
537
OpenShare(const std::string & subcommand,const char * mode,int shflag)538 bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
539 int shflag) {
540 return Open(subcommand, mode);
541 }
542
DoClose()543 void POpenStream::DoClose() {
544 wait_status_ = pclose(file_);
545 }
546
547 #endif
548
549 ///////////////////////////////////////////////////////////////////////////////
550 // MemoryStream
551 ///////////////////////////////////////////////////////////////////////////////
552
MemoryStreamBase()553 MemoryStreamBase::MemoryStreamBase()
554 : buffer_(NULL), buffer_length_(0), data_length_(0),
555 seek_position_(0) {
556 }
557
GetState() const558 StreamState MemoryStreamBase::GetState() const {
559 return SS_OPEN;
560 }
561
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)562 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
563 size_t* bytes_read, int* error) {
564 if (seek_position_ >= data_length_) {
565 return SR_EOS;
566 }
567 size_t available = data_length_ - seek_position_;
568 if (bytes > available) {
569 // Read partial buffer
570 bytes = available;
571 }
572 memcpy(buffer, &buffer_[seek_position_], bytes);
573 seek_position_ += bytes;
574 if (bytes_read) {
575 *bytes_read = bytes;
576 }
577 return SR_SUCCESS;
578 }
579
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)580 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
581 size_t* bytes_written, int* error) {
582 size_t available = buffer_length_ - seek_position_;
583 if (0 == available) {
584 // Increase buffer size to the larger of:
585 // a) new position rounded up to next 256 bytes
586 // b) double the previous length
587 size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
588 buffer_length_ * 2);
589 StreamResult result = DoReserve(new_buffer_length, error);
590 if (SR_SUCCESS != result) {
591 return result;
592 }
593 ASSERT(buffer_length_ >= new_buffer_length);
594 available = buffer_length_ - seek_position_;
595 }
596
597 if (bytes > available) {
598 bytes = available;
599 }
600 memcpy(&buffer_[seek_position_], buffer, bytes);
601 seek_position_ += bytes;
602 if (data_length_ < seek_position_) {
603 data_length_ = seek_position_;
604 }
605 if (bytes_written) {
606 *bytes_written = bytes;
607 }
608 return SR_SUCCESS;
609 }
610
Close()611 void MemoryStreamBase::Close() {
612 // nothing to do
613 }
614
SetPosition(size_t position)615 bool MemoryStreamBase::SetPosition(size_t position) {
616 if (position > data_length_)
617 return false;
618 seek_position_ = position;
619 return true;
620 }
621
GetPosition(size_t * position) const622 bool MemoryStreamBase::GetPosition(size_t *position) const {
623 if (position)
624 *position = seek_position_;
625 return true;
626 }
627
GetSize(size_t * size) const628 bool MemoryStreamBase::GetSize(size_t *size) const {
629 if (size)
630 *size = data_length_;
631 return true;
632 }
633
GetAvailable(size_t * size) const634 bool MemoryStreamBase::GetAvailable(size_t *size) const {
635 if (size)
636 *size = data_length_ - seek_position_;
637 return true;
638 }
639
ReserveSize(size_t size)640 bool MemoryStreamBase::ReserveSize(size_t size) {
641 return (SR_SUCCESS == DoReserve(size, NULL));
642 }
643
DoReserve(size_t size,int * error)644 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
645 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
646 }
647
648 ///////////////////////////////////////////////////////////////////////////////
649
MemoryStream()650 MemoryStream::MemoryStream()
651 : buffer_alloc_(NULL) {
652 }
653
MemoryStream(const char * data)654 MemoryStream::MemoryStream(const char* data)
655 : buffer_alloc_(NULL) {
656 SetData(data, strlen(data));
657 }
658
MemoryStream(const void * data,size_t length)659 MemoryStream::MemoryStream(const void* data, size_t length)
660 : buffer_alloc_(NULL) {
661 SetData(data, length);
662 }
663
~MemoryStream()664 MemoryStream::~MemoryStream() {
665 delete [] buffer_alloc_;
666 }
667
SetData(const void * data,size_t length)668 void MemoryStream::SetData(const void* data, size_t length) {
669 data_length_ = buffer_length_ = length;
670 delete [] buffer_alloc_;
671 buffer_alloc_ = new char[buffer_length_ + kAlignment];
672 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
673 memcpy(buffer_, data, data_length_);
674 seek_position_ = 0;
675 }
676
DoReserve(size_t size,int * error)677 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
678 if (buffer_length_ >= size)
679 return SR_SUCCESS;
680
681 if (char* new_buffer_alloc = new char[size + kAlignment]) {
682 char* new_buffer = reinterpret_cast<char*>(
683 ALIGNP(new_buffer_alloc, kAlignment));
684 memcpy(new_buffer, buffer_, data_length_);
685 delete [] buffer_alloc_;
686 buffer_alloc_ = new_buffer_alloc;
687 buffer_ = new_buffer;
688 buffer_length_ = size;
689 return SR_SUCCESS;
690 }
691
692 if (error) {
693 *error = ENOMEM;
694 }
695 return SR_ERROR;
696 }
697
698 ///////////////////////////////////////////////////////////////////////////////
699
ExternalMemoryStream()700 ExternalMemoryStream::ExternalMemoryStream() {
701 }
702
ExternalMemoryStream(void * data,size_t length)703 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
704 SetData(data, length);
705 }
706
~ExternalMemoryStream()707 ExternalMemoryStream::~ExternalMemoryStream() {
708 }
709
SetData(void * data,size_t length)710 void ExternalMemoryStream::SetData(void* data, size_t length) {
711 data_length_ = buffer_length_ = length;
712 buffer_ = static_cast<char*>(data);
713 seek_position_ = 0;
714 }
715
716 ///////////////////////////////////////////////////////////////////////////////
717 // FifoBuffer
718 ///////////////////////////////////////////////////////////////////////////////
719
FifoBuffer(size_t size)720 FifoBuffer::FifoBuffer(size_t size)
721 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
722 data_length_(0), read_position_(0), owner_(Thread::Current()) {
723 // all events are done on the owner_ thread
724 }
725
~FifoBuffer()726 FifoBuffer::~FifoBuffer() {
727 }
728
GetBuffered(size_t * size) const729 bool FifoBuffer::GetBuffered(size_t* size) const {
730 CritScope cs(&crit_);
731 *size = data_length_;
732 return true;
733 }
734
SetCapacity(size_t size)735 bool FifoBuffer::SetCapacity(size_t size) {
736 CritScope cs(&crit_);
737 if (data_length_ > size) {
738 return false;
739 }
740
741 if (size != buffer_length_) {
742 char* buffer = new char[size];
743 const size_t copy = data_length_;
744 const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
745 memcpy(buffer, &buffer_[read_position_], tail_copy);
746 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
747 buffer_.reset(buffer);
748 read_position_ = 0;
749 buffer_length_ = size;
750 }
751 return true;
752 }
753
GetState() const754 StreamState FifoBuffer::GetState() const {
755 return state_;
756 }
757
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)758 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
759 size_t* bytes_read, int* error) {
760 CritScope cs(&crit_);
761 const size_t available = data_length_;
762 if (0 == available) {
763 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
764 }
765
766 const bool was_writable = data_length_ < buffer_length_;
767 const size_t copy = _min(bytes, available);
768 const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
769 char* const p = static_cast<char*>(buffer);
770 memcpy(p, &buffer_[read_position_], tail_copy);
771 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
772 read_position_ = (read_position_ + copy) % buffer_length_;
773 data_length_ -= copy;
774 if (bytes_read) {
775 *bytes_read = copy;
776 }
777 // if we were full before, and now we're not, post an event
778 if (!was_writable && copy > 0) {
779 PostEvent(owner_, SE_WRITE, 0);
780 }
781
782 return SR_SUCCESS;
783 }
784
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)785 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
786 size_t* bytes_written, int* error) {
787 CritScope cs(&crit_);
788 if (state_ == SS_CLOSED) {
789 return SR_EOS;
790 }
791
792 const size_t available = buffer_length_ - data_length_;
793 if (0 == available) {
794 return SR_BLOCK;
795 }
796
797 const bool was_readable = (data_length_ > 0);
798 const size_t write_position = (read_position_ + data_length_)
799 % buffer_length_;
800 const size_t copy = _min(bytes, available);
801 const size_t tail_copy = _min(copy, buffer_length_ - write_position);
802 const char* const p = static_cast<const char*>(buffer);
803 memcpy(&buffer_[write_position], p, tail_copy);
804 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
805 data_length_ += copy;
806 if (bytes_written) {
807 *bytes_written = copy;
808 }
809 // if we didn't have any data to read before, and now we do, post an event
810 if (!was_readable && copy > 0) {
811 PostEvent(owner_, SE_READ, 0);
812 }
813
814 return SR_SUCCESS;
815 }
816
Close()817 void FifoBuffer::Close() {
818 CritScope cs(&crit_);
819 state_ = SS_CLOSED;
820 }
821
GetReadData(size_t * size)822 const void* FifoBuffer::GetReadData(size_t* size) {
823 CritScope cs(&crit_);
824 *size = (read_position_ + data_length_ <= buffer_length_) ?
825 data_length_ : buffer_length_ - read_position_;
826 return &buffer_[read_position_];
827 }
828
ConsumeReadData(size_t size)829 void FifoBuffer::ConsumeReadData(size_t size) {
830 CritScope cs(&crit_);
831 ASSERT(size <= data_length_);
832 const bool was_writable = data_length_ < buffer_length_;
833 read_position_ = (read_position_ + size) % buffer_length_;
834 data_length_ -= size;
835 if (!was_writable && size > 0) {
836 PostEvent(owner_, SE_WRITE, 0);
837 }
838 }
839
GetWriteBuffer(size_t * size)840 void* FifoBuffer::GetWriteBuffer(size_t* size) {
841 CritScope cs(&crit_);
842 if (state_ == SS_CLOSED) {
843 return NULL;
844 }
845
846 // if empty, reset the write position to the beginning, so we can get
847 // the biggest possible block
848 if (data_length_ == 0) {
849 read_position_ = 0;
850 }
851
852 const size_t write_position = (read_position_ + data_length_)
853 % buffer_length_;
854 *size = (write_position >= read_position_) ?
855 buffer_length_ - write_position : read_position_ - write_position;
856 return &buffer_[write_position];
857 }
858
ConsumeWriteBuffer(size_t size)859 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
860 CritScope cs(&crit_);
861 ASSERT(size <= buffer_length_ - data_length_);
862 const bool was_readable = (data_length_ > 0);
863 data_length_ += size;
864 if (!was_readable && size > 0) {
865 PostEvent(owner_, SE_READ, 0);
866 }
867 }
868
869 ///////////////////////////////////////////////////////////////////////////////
870 // LoggingAdapter
871 ///////////////////////////////////////////////////////////////////////////////
872
LoggingAdapter(StreamInterface * stream,LoggingSeverity level,const std::string & label,bool hex_mode)873 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
874 const std::string& label, bool hex_mode)
875 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode)
876 {
877 set_label(label);
878 }
879
set_label(const std::string & label)880 void LoggingAdapter::set_label(const std::string& label) {
881 label_.assign("[");
882 label_.append(label);
883 label_.append("]");
884 }
885
Read(void * buffer,size_t buffer_len,size_t * read,int * error)886 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
887 size_t* read, int* error) {
888 size_t local_read; if (!read) read = &local_read;
889 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
890 error);
891 if (result == SR_SUCCESS) {
892 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
893 }
894 return result;
895 }
896
Write(const void * data,size_t data_len,size_t * written,int * error)897 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
898 size_t* written, int* error) {
899 size_t local_written; if (!written) written = &local_written;
900 StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
901 error);
902 if (result == SR_SUCCESS) {
903 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
904 &lms_);
905 }
906 return result;
907 }
908
Close()909 void LoggingAdapter::Close() {
910 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
911 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
912 LOG_V(level_) << label_ << " Closed locally";
913 StreamAdapterInterface::Close();
914 }
915
OnEvent(StreamInterface * stream,int events,int err)916 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
917 if (events & SE_OPEN) {
918 LOG_V(level_) << label_ << " Open";
919 } else if (events & SE_CLOSE) {
920 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
921 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
922 LOG_V(level_) << label_ << " Closed with error: " << err;
923 }
924 StreamAdapterInterface::OnEvent(stream, events, err);
925 }
926
927 ///////////////////////////////////////////////////////////////////////////////
928 // StringStream - Reads/Writes to an external std::string
929 ///////////////////////////////////////////////////////////////////////////////
930
StringStream(std::string & str)931 StringStream::StringStream(std::string& str)
932 : str_(str), read_pos_(0), read_only_(false)
933 {
934 }
935
StringStream(const std::string & str)936 StringStream::StringStream(const std::string& str)
937 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true)
938 {
939 }
940
GetState() const941 StreamState StringStream::GetState() const {
942 return SS_OPEN;
943 }
944
Read(void * buffer,size_t buffer_len,size_t * read,int * error)945 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
946 size_t* read, int* error) {
947 size_t available = _min(buffer_len, str_.size() - read_pos_);
948 if (!available)
949 return SR_EOS;
950 memcpy(buffer, str_.data() + read_pos_, available);
951 read_pos_ += available;
952 if (read)
953 *read = available;
954 return SR_SUCCESS;
955 }
956
Write(const void * data,size_t data_len,size_t * written,int * error)957 StreamResult StringStream::Write(const void* data, size_t data_len,
958 size_t* written, int* error) {
959 if (read_only_) {
960 if (error) {
961 *error = -1;
962 }
963 return SR_ERROR;
964 }
965 str_.append(static_cast<const char*>(data),
966 static_cast<const char*>(data) + data_len);
967 if (written)
968 *written = data_len;
969 return SR_SUCCESS;
970 }
971
Close()972 void StringStream::Close() {
973 }
974
SetPosition(size_t position)975 bool StringStream::SetPosition(size_t position) {
976 if (position > str_.size())
977 return false;
978 read_pos_ = position;
979 return true;
980 }
981
GetPosition(size_t * position) const982 bool StringStream::GetPosition(size_t* position) const {
983 if (position)
984 *position = read_pos_;
985 return true;
986 }
987
GetSize(size_t * size) const988 bool StringStream::GetSize(size_t* size) const {
989 if (size)
990 *size = str_.size();
991 return true;
992 }
993
GetAvailable(size_t * size) const994 bool StringStream::GetAvailable(size_t* size) const {
995 if (size)
996 *size = str_.size() - read_pos_;
997 return true;
998 }
999
ReserveSize(size_t size)1000 bool StringStream::ReserveSize(size_t size) {
1001 if (read_only_)
1002 return false;
1003 str_.reserve(size);
1004 return true;
1005 }
1006
1007 ///////////////////////////////////////////////////////////////////////////////
1008 // StreamReference
1009 ///////////////////////////////////////////////////////////////////////////////
1010
StreamReference(StreamInterface * stream)1011 StreamReference::StreamReference(StreamInterface* stream)
1012 : StreamAdapterInterface(stream, false) {
1013 // owner set to false so the destructor does not free the stream.
1014 stream_ref_count_ = new StreamRefCount(stream);
1015 }
1016
NewReference()1017 StreamInterface* StreamReference::NewReference() {
1018 stream_ref_count_->AddReference();
1019 return new StreamReference(stream_ref_count_, stream());
1020 }
1021
~StreamReference()1022 StreamReference::~StreamReference() {
1023 stream_ref_count_->Release();
1024 }
1025
StreamReference(StreamRefCount * stream_ref_count,StreamInterface * stream)1026 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1027 StreamInterface* stream)
1028 : StreamAdapterInterface(stream, false),
1029 stream_ref_count_(stream_ref_count) {
1030 }
1031
1032 ///////////////////////////////////////////////////////////////////////////////
1033
Flow(StreamInterface * source,char * buffer,size_t buffer_len,StreamInterface * sink,size_t * data_len)1034 StreamResult Flow(StreamInterface* source,
1035 char* buffer, size_t buffer_len,
1036 StreamInterface* sink,
1037 size_t* data_len /* = NULL */) {
1038 ASSERT(buffer_len > 0);
1039
1040 StreamResult result;
1041 size_t count, read_pos, write_pos;
1042 if (data_len) {
1043 read_pos = *data_len;
1044 } else {
1045 read_pos = 0;
1046 }
1047
1048 bool end_of_stream = false;
1049 do {
1050 // Read until buffer is full, end of stream, or error
1051 while (!end_of_stream && (read_pos < buffer_len)) {
1052 result = source->Read(buffer + read_pos, buffer_len - read_pos,
1053 &count, NULL);
1054 if (result == SR_EOS) {
1055 end_of_stream = true;
1056 } else if (result != SR_SUCCESS) {
1057 if (data_len) {
1058 *data_len = read_pos;
1059 }
1060 return result;
1061 } else {
1062 read_pos += count;
1063 }
1064 }
1065
1066 // Write until buffer is empty, or error (including end of stream)
1067 write_pos = 0;
1068 while (write_pos < read_pos) {
1069 result = sink->Write(buffer + write_pos, read_pos - write_pos,
1070 &count, NULL);
1071 if (result != SR_SUCCESS) {
1072 if (data_len) {
1073 *data_len = read_pos - write_pos;
1074 if (write_pos > 0) {
1075 memmove(buffer, buffer + write_pos, *data_len);
1076 }
1077 }
1078 return result;
1079 }
1080 write_pos += count;
1081 }
1082
1083 read_pos = 0;
1084 } while (!end_of_stream);
1085
1086 if (data_len) {
1087 *data_len = 0;
1088 }
1089 return SR_SUCCESS;
1090 }
1091
1092 ///////////////////////////////////////////////////////////////////////////////
1093
1094 } // namespace talk_base
1095