1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #if defined(WEBRTC_POSIX)
12 #include <sys/file.h>
13 #endif // WEBRTC_POSIX
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <errno.h>
17 #include <string>
18 #include "webrtc/base/basictypes.h"
19 #include "webrtc/base/common.h"
20 #include "webrtc/base/logging.h"
21 #include "webrtc/base/messagequeue.h"
22 #include "webrtc/base/stream.h"
23 #include "webrtc/base/stringencode.h"
24 #include "webrtc/base/stringutils.h"
25 #include "webrtc/base/thread.h"
26 #include "webrtc/base/timeutils.h"
27
28 #if defined(WEBRTC_WIN)
29 #include "webrtc/base/win32.h"
30 #define fileno _fileno
31 #endif
32
33 namespace rtc {
34
35 ///////////////////////////////////////////////////////////////////////////////
36 // StreamInterface
37 ///////////////////////////////////////////////////////////////////////////////
~StreamInterface()38 StreamInterface::~StreamInterface() {
39 }
40
WriteAll(const void * data,size_t data_len,size_t * written,int * error)41 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
42 size_t* written, int* error) {
43 StreamResult result = SR_SUCCESS;
44 size_t total_written = 0, current_written;
45 while (total_written < data_len) {
46 result = Write(static_cast<const char*>(data) + total_written,
47 data_len - total_written, ¤t_written, error);
48 if (result != SR_SUCCESS)
49 break;
50 total_written += current_written;
51 }
52 if (written)
53 *written = total_written;
54 return result;
55 }
56
ReadAll(void * buffer,size_t buffer_len,size_t * read,int * error)57 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
58 size_t* read, int* error) {
59 StreamResult result = SR_SUCCESS;
60 size_t total_read = 0, current_read;
61 while (total_read < buffer_len) {
62 result = Read(static_cast<char*>(buffer) + total_read,
63 buffer_len - total_read, ¤t_read, error);
64 if (result != SR_SUCCESS)
65 break;
66 total_read += current_read;
67 }
68 if (read)
69 *read = total_read;
70 return result;
71 }
72
ReadLine(std::string * line)73 StreamResult StreamInterface::ReadLine(std::string* line) {
74 line->clear();
75 StreamResult result = SR_SUCCESS;
76 while (true) {
77 char ch;
78 result = Read(&ch, sizeof(ch), NULL, NULL);
79 if (result != SR_SUCCESS) {
80 break;
81 }
82 if (ch == '\n') {
83 break;
84 }
85 line->push_back(ch);
86 }
87 if (!line->empty()) { // give back the line we've collected so far with
88 result = SR_SUCCESS; // a success code. Otherwise return the last code
89 }
90 return result;
91 }
92
PostEvent(Thread * t,int events,int err)93 void StreamInterface::PostEvent(Thread* t, int events, int err) {
94 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
95 }
96
PostEvent(int events,int err)97 void StreamInterface::PostEvent(int events, int err) {
98 PostEvent(Thread::Current(), events, err);
99 }
100
StreamInterface()101 StreamInterface::StreamInterface() {
102 }
103
OnMessage(Message * msg)104 void StreamInterface::OnMessage(Message* msg) {
105 if (MSG_POST_EVENT == msg->message_id) {
106 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
107 SignalEvent(this, pe->events, pe->error);
108 delete msg->pdata;
109 }
110 }
111
112 ///////////////////////////////////////////////////////////////////////////////
113 // StreamAdapterInterface
114 ///////////////////////////////////////////////////////////////////////////////
115
StreamAdapterInterface(StreamInterface * stream,bool owned)116 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
117 bool owned)
118 : stream_(stream), owned_(owned) {
119 if (NULL != stream_)
120 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
121 }
122
Attach(StreamInterface * stream,bool owned)123 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
124 if (NULL != stream_)
125 stream_->SignalEvent.disconnect(this);
126 if (owned_)
127 delete stream_;
128 stream_ = stream;
129 owned_ = owned;
130 if (NULL != stream_)
131 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
132 }
133
Detach()134 StreamInterface* StreamAdapterInterface::Detach() {
135 if (NULL != stream_)
136 stream_->SignalEvent.disconnect(this);
137 StreamInterface* stream = stream_;
138 stream_ = NULL;
139 return stream;
140 }
141
~StreamAdapterInterface()142 StreamAdapterInterface::~StreamAdapterInterface() {
143 if (owned_)
144 delete stream_;
145 }
146
147 ///////////////////////////////////////////////////////////////////////////////
148 // StreamTap
149 ///////////////////////////////////////////////////////////////////////////////
150
StreamTap(StreamInterface * stream,StreamInterface * tap)151 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
152 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
153 tap_error_(0) {
154 AttachTap(tap);
155 }
156
AttachTap(StreamInterface * tap)157 void StreamTap::AttachTap(StreamInterface* tap) {
158 tap_.reset(tap);
159 }
160
DetachTap()161 StreamInterface* StreamTap::DetachTap() {
162 return tap_.release();
163 }
164
GetTapResult(int * error)165 StreamResult StreamTap::GetTapResult(int* error) {
166 if (error) {
167 *error = tap_error_;
168 }
169 return tap_result_;
170 }
171
Read(void * buffer,size_t buffer_len,size_t * read,int * error)172 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
173 size_t* read, int* error) {
174 size_t backup_read;
175 if (!read) {
176 read = &backup_read;
177 }
178 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
179 read, error);
180 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
181 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
182 }
183 return res;
184 }
185
Write(const void * data,size_t data_len,size_t * written,int * error)186 StreamResult StreamTap::Write(const void* data, size_t data_len,
187 size_t* written, int* error) {
188 size_t backup_written;
189 if (!written) {
190 written = &backup_written;
191 }
192 StreamResult res = StreamAdapterInterface::Write(data, data_len,
193 written, error);
194 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
195 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
196 }
197 return res;
198 }
199
200 ///////////////////////////////////////////////////////////////////////////////
201 // StreamSegment
202 ///////////////////////////////////////////////////////////////////////////////
203
StreamSegment(StreamInterface * stream)204 StreamSegment::StreamSegment(StreamInterface* stream)
205 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
206 length_(SIZE_UNKNOWN) {
207 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
208 stream->GetPosition(&start_);
209 }
210
StreamSegment(StreamInterface * stream,size_t length)211 StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
212 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
213 length_(length) {
214 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
215 stream->GetPosition(&start_);
216 }
217
Read(void * buffer,size_t buffer_len,size_t * read,int * error)218 StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
219 size_t* read, int* error) {
220 if (SIZE_UNKNOWN != length_) {
221 if (pos_ >= length_)
222 return SR_EOS;
223 buffer_len = _min(buffer_len, length_ - pos_);
224 }
225 size_t backup_read;
226 if (!read) {
227 read = &backup_read;
228 }
229 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
230 read, error);
231 if (SR_SUCCESS == result) {
232 pos_ += *read;
233 }
234 return result;
235 }
236
SetPosition(size_t position)237 bool StreamSegment::SetPosition(size_t position) {
238 if (SIZE_UNKNOWN == start_)
239 return false; // Not seekable
240 if ((SIZE_UNKNOWN != length_) && (position > length_))
241 return false; // Seek past end of segment
242 if (!StreamAdapterInterface::SetPosition(start_ + position))
243 return false;
244 pos_ = position;
245 return true;
246 }
247
GetPosition(size_t * position) const248 bool StreamSegment::GetPosition(size_t* position) const {
249 if (SIZE_UNKNOWN == start_)
250 return false; // Not seekable
251 if (!StreamAdapterInterface::GetPosition(position))
252 return false;
253 if (position) {
254 ASSERT(*position >= start_);
255 *position -= start_;
256 }
257 return true;
258 }
259
GetSize(size_t * size) const260 bool StreamSegment::GetSize(size_t* size) const {
261 if (!StreamAdapterInterface::GetSize(size))
262 return false;
263 if (size) {
264 if (SIZE_UNKNOWN != start_) {
265 ASSERT(*size >= start_);
266 *size -= start_;
267 }
268 if (SIZE_UNKNOWN != length_) {
269 *size = _min(*size, length_);
270 }
271 }
272 return true;
273 }
274
GetAvailable(size_t * size) const275 bool StreamSegment::GetAvailable(size_t* size) const {
276 if (!StreamAdapterInterface::GetAvailable(size))
277 return false;
278 if (size && (SIZE_UNKNOWN != length_))
279 *size = _min(*size, length_ - pos_);
280 return true;
281 }
282
283 ///////////////////////////////////////////////////////////////////////////////
284 // NullStream
285 ///////////////////////////////////////////////////////////////////////////////
286
NullStream()287 NullStream::NullStream() {
288 }
289
~NullStream()290 NullStream::~NullStream() {
291 }
292
GetState() const293 StreamState NullStream::GetState() const {
294 return SS_OPEN;
295 }
296
Read(void * buffer,size_t buffer_len,size_t * read,int * error)297 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
298 size_t* read, int* error) {
299 if (error) *error = -1;
300 return SR_ERROR;
301 }
302
Write(const void * data,size_t data_len,size_t * written,int * error)303 StreamResult NullStream::Write(const void* data, size_t data_len,
304 size_t* written, int* error) {
305 if (written) *written = data_len;
306 return SR_SUCCESS;
307 }
308
Close()309 void NullStream::Close() {
310 }
311
312 ///////////////////////////////////////////////////////////////////////////////
313 // FileStream
314 ///////////////////////////////////////////////////////////////////////////////
315
FileStream()316 FileStream::FileStream() : file_(NULL) {
317 }
318
~FileStream()319 FileStream::~FileStream() {
320 FileStream::Close();
321 }
322
Open(const std::string & filename,const char * mode,int * error)323 bool FileStream::Open(const std::string& filename, const char* mode,
324 int* error) {
325 Close();
326 #if defined(WEBRTC_WIN)
327 std::wstring wfilename;
328 if (Utf8ToWindowsFilename(filename, &wfilename)) {
329 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
330 } else {
331 if (error) {
332 *error = -1;
333 return false;
334 }
335 }
336 #else
337 file_ = fopen(filename.c_str(), mode);
338 #endif
339 if (!file_ && error) {
340 *error = errno;
341 }
342 return (file_ != NULL);
343 }
344
OpenShare(const std::string & filename,const char * mode,int shflag,int * error)345 bool FileStream::OpenShare(const std::string& filename, const char* mode,
346 int shflag, int* error) {
347 Close();
348 #if defined(WEBRTC_WIN)
349 std::wstring wfilename;
350 if (Utf8ToWindowsFilename(filename, &wfilename)) {
351 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
352 if (!file_ && error) {
353 *error = errno;
354 return false;
355 }
356 return file_ != NULL;
357 } else {
358 if (error) {
359 *error = -1;
360 }
361 return false;
362 }
363 #else
364 return Open(filename, mode, error);
365 #endif
366 }
367
DisableBuffering()368 bool FileStream::DisableBuffering() {
369 if (!file_)
370 return false;
371 return (setvbuf(file_, NULL, _IONBF, 0) == 0);
372 }
373
GetState() const374 StreamState FileStream::GetState() const {
375 return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
376 }
377
Read(void * buffer,size_t buffer_len,size_t * read,int * error)378 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
379 size_t* read, int* error) {
380 if (!file_)
381 return SR_EOS;
382 size_t result = fread(buffer, 1, buffer_len, file_);
383 if ((result == 0) && (buffer_len > 0)) {
384 if (feof(file_))
385 return SR_EOS;
386 if (error)
387 *error = errno;
388 return SR_ERROR;
389 }
390 if (read)
391 *read = result;
392 return SR_SUCCESS;
393 }
394
Write(const void * data,size_t data_len,size_t * written,int * error)395 StreamResult FileStream::Write(const void* data, size_t data_len,
396 size_t* written, int* error) {
397 if (!file_)
398 return SR_EOS;
399 size_t result = fwrite(data, 1, data_len, file_);
400 if ((result == 0) && (data_len > 0)) {
401 if (error)
402 *error = errno;
403 return SR_ERROR;
404 }
405 if (written)
406 *written = result;
407 return SR_SUCCESS;
408 }
409
Close()410 void FileStream::Close() {
411 if (file_) {
412 DoClose();
413 file_ = NULL;
414 }
415 }
416
SetPosition(size_t position)417 bool FileStream::SetPosition(size_t position) {
418 if (!file_)
419 return false;
420 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
421 }
422
GetPosition(size_t * position) const423 bool FileStream::GetPosition(size_t* position) const {
424 ASSERT(NULL != position);
425 if (!file_)
426 return false;
427 long result = ftell(file_);
428 if (result < 0)
429 return false;
430 if (position)
431 *position = result;
432 return true;
433 }
434
GetSize(size_t * size) const435 bool FileStream::GetSize(size_t* size) const {
436 ASSERT(NULL != size);
437 if (!file_)
438 return false;
439 struct stat file_stats;
440 if (fstat(fileno(file_), &file_stats) != 0)
441 return false;
442 if (size)
443 *size = file_stats.st_size;
444 return true;
445 }
446
GetAvailable(size_t * size) const447 bool FileStream::GetAvailable(size_t* size) const {
448 ASSERT(NULL != size);
449 if (!GetSize(size))
450 return false;
451 long result = ftell(file_);
452 if (result < 0)
453 return false;
454 if (size)
455 *size -= result;
456 return true;
457 }
458
ReserveSize(size_t size)459 bool FileStream::ReserveSize(size_t size) {
460 // TODO: extend the file to the proper length
461 return true;
462 }
463
GetSize(const std::string & filename,size_t * size)464 bool FileStream::GetSize(const std::string& filename, size_t* size) {
465 struct stat file_stats;
466 if (stat(filename.c_str(), &file_stats) != 0)
467 return false;
468 *size = file_stats.st_size;
469 return true;
470 }
471
Flush()472 bool FileStream::Flush() {
473 if (file_) {
474 return (0 == fflush(file_));
475 }
476 // try to flush empty file?
477 ASSERT(false);
478 return false;
479 }
480
481 #if defined(WEBRTC_POSIX) && !defined(__native_client__)
482
TryLock()483 bool FileStream::TryLock() {
484 if (file_ == NULL) {
485 // Stream not open.
486 ASSERT(false);
487 return false;
488 }
489
490 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
491 }
492
Unlock()493 bool FileStream::Unlock() {
494 if (file_ == NULL) {
495 // Stream not open.
496 ASSERT(false);
497 return false;
498 }
499
500 return flock(fileno(file_), LOCK_UN) == 0;
501 }
502
503 #endif
504
DoClose()505 void FileStream::DoClose() {
506 fclose(file_);
507 }
508
CircularFileStream(size_t max_size)509 CircularFileStream::CircularFileStream(size_t max_size)
510 : max_write_size_(max_size),
511 position_(0),
512 marked_position_(max_size / 2),
513 last_write_position_(0),
514 read_segment_(READ_LATEST),
515 read_segment_available_(0) {
516 }
517
Open(const std::string & filename,const char * mode,int * error)518 bool CircularFileStream::Open(
519 const std::string& filename, const char* mode, int* error) {
520 if (!FileStream::Open(filename.c_str(), mode, error))
521 return false;
522
523 if (strchr(mode, "r") != NULL) { // Opened in read mode.
524 // Check if the buffer has been overwritten and determine how to read the
525 // log in time sequence.
526 size_t file_size;
527 GetSize(&file_size);
528 if (file_size == position_) {
529 // The buffer has not been overwritten yet. Read 0 .. file_size
530 read_segment_ = READ_LATEST;
531 read_segment_available_ = file_size;
532 } else {
533 // The buffer has been over written. There are three segments: The first
534 // one is 0 .. marked_position_, which is the marked earliest log. The
535 // second one is position_ .. file_size, which is the middle log. The
536 // last one is marked_position_ .. position_, which is the latest log.
537 read_segment_ = READ_MARKED;
538 read_segment_available_ = marked_position_;
539 last_write_position_ = position_;
540 }
541
542 // Read from the beginning.
543 position_ = 0;
544 SetPosition(position_);
545 }
546
547 return true;
548 }
549
Read(void * buffer,size_t buffer_len,size_t * read,int * error)550 StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
551 size_t* read, int* error) {
552 if (read_segment_available_ == 0) {
553 size_t file_size;
554 switch (read_segment_) {
555 case READ_MARKED: // Finished READ_MARKED and start READ_MIDDLE.
556 read_segment_ = READ_MIDDLE;
557 position_ = last_write_position_;
558 SetPosition(position_);
559 GetSize(&file_size);
560 read_segment_available_ = file_size - position_;
561 break;
562
563 case READ_MIDDLE: // Finished READ_MIDDLE and start READ_LATEST.
564 read_segment_ = READ_LATEST;
565 position_ = marked_position_;
566 SetPosition(position_);
567 read_segment_available_ = last_write_position_ - position_;
568 break;
569
570 default: // Finished READ_LATEST and return EOS.
571 return rtc::SR_EOS;
572 }
573 }
574
575 size_t local_read;
576 if (!read) read = &local_read;
577
578 size_t to_read = rtc::_min(buffer_len, read_segment_available_);
579 rtc::StreamResult result
580 = rtc::FileStream::Read(buffer, to_read, read, error);
581 if (result == rtc::SR_SUCCESS) {
582 read_segment_available_ -= *read;
583 position_ += *read;
584 }
585 return result;
586 }
587
Write(const void * data,size_t data_len,size_t * written,int * error)588 StreamResult CircularFileStream::Write(const void* data, size_t data_len,
589 size_t* written, int* error) {
590 if (position_ >= max_write_size_) {
591 ASSERT(position_ == max_write_size_);
592 position_ = marked_position_;
593 SetPosition(position_);
594 }
595
596 size_t local_written;
597 if (!written) written = &local_written;
598
599 size_t to_eof = max_write_size_ - position_;
600 size_t to_write = rtc::_min(data_len, to_eof);
601 rtc::StreamResult result
602 = rtc::FileStream::Write(data, to_write, written, error);
603 if (result == rtc::SR_SUCCESS) {
604 position_ += *written;
605 }
606 return result;
607 }
608
~AsyncWriteStream()609 AsyncWriteStream::~AsyncWriteStream() {
610 write_thread_->Clear(this, 0, NULL);
611 ClearBufferAndWrite();
612
613 CritScope cs(&crit_stream_);
614 stream_.reset();
615 }
616
617 // This is needed by some stream writers, such as RtpDumpWriter.
GetPosition(size_t * position) const618 bool AsyncWriteStream::GetPosition(size_t* position) const {
619 CritScope cs(&crit_stream_);
620 return stream_->GetPosition(position);
621 }
622
623 // This is needed by some stream writers, such as the plugin log writers.
Read(void * buffer,size_t buffer_len,size_t * read,int * error)624 StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
625 size_t* read, int* error) {
626 CritScope cs(&crit_stream_);
627 return stream_->Read(buffer, buffer_len, read, error);
628 }
629
Close()630 void AsyncWriteStream::Close() {
631 if (state_ == SS_CLOSED) {
632 return;
633 }
634
635 write_thread_->Clear(this, 0, NULL);
636 ClearBufferAndWrite();
637
638 CritScope cs(&crit_stream_);
639 stream_->Close();
640 state_ = SS_CLOSED;
641 }
642
Write(const void * data,size_t data_len,size_t * written,int * error)643 StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
644 size_t* written, int* error) {
645 if (state_ == SS_CLOSED) {
646 return SR_ERROR;
647 }
648
649 size_t previous_buffer_length = 0;
650 {
651 CritScope cs(&crit_buffer_);
652 previous_buffer_length = buffer_.length();
653 buffer_.AppendData(data, data_len);
654 }
655
656 if (previous_buffer_length == 0) {
657 // If there's stuff already in the buffer, then we already called
658 // Post and the write_thread_ hasn't pulled it out yet, so we
659 // don't need to re-Post.
660 write_thread_->Post(this, 0, NULL);
661 }
662 // Return immediately, assuming that it works.
663 if (written) {
664 *written = data_len;
665 }
666 return SR_SUCCESS;
667 }
668
OnMessage(rtc::Message * pmsg)669 void AsyncWriteStream::OnMessage(rtc::Message* pmsg) {
670 ClearBufferAndWrite();
671 }
672
Flush()673 bool AsyncWriteStream::Flush() {
674 if (state_ == SS_CLOSED) {
675 return false;
676 }
677
678 ClearBufferAndWrite();
679
680 CritScope cs(&crit_stream_);
681 return stream_->Flush();
682 }
683
ClearBufferAndWrite()684 void AsyncWriteStream::ClearBufferAndWrite() {
685 Buffer to_write;
686 {
687 CritScope cs_buffer(&crit_buffer_);
688 buffer_.TransferTo(&to_write);
689 }
690
691 if (to_write.length() > 0) {
692 CritScope cs(&crit_stream_);
693 stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
694 }
695 }
696
697 #if defined(WEBRTC_POSIX) && !defined(__native_client__)
698
699 // Have to identically rewrite the FileStream destructor or else it would call
700 // the base class's Close() instead of the sub-class's.
~POpenStream()701 POpenStream::~POpenStream() {
702 POpenStream::Close();
703 }
704
Open(const std::string & subcommand,const char * mode,int * error)705 bool POpenStream::Open(const std::string& subcommand,
706 const char* mode,
707 int* error) {
708 Close();
709 file_ = popen(subcommand.c_str(), mode);
710 if (file_ == NULL) {
711 if (error)
712 *error = errno;
713 return false;
714 }
715 return true;
716 }
717
OpenShare(const std::string & subcommand,const char * mode,int shflag,int * error)718 bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
719 int shflag, int* error) {
720 return Open(subcommand, mode, error);
721 }
722
DoClose()723 void POpenStream::DoClose() {
724 wait_status_ = pclose(file_);
725 }
726
727 #endif
728
729 ///////////////////////////////////////////////////////////////////////////////
730 // MemoryStream
731 ///////////////////////////////////////////////////////////////////////////////
732
MemoryStreamBase()733 MemoryStreamBase::MemoryStreamBase()
734 : buffer_(NULL), buffer_length_(0), data_length_(0),
735 seek_position_(0) {
736 }
737
GetState() const738 StreamState MemoryStreamBase::GetState() const {
739 return SS_OPEN;
740 }
741
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)742 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
743 size_t* bytes_read, int* error) {
744 if (seek_position_ >= data_length_) {
745 return SR_EOS;
746 }
747 size_t available = data_length_ - seek_position_;
748 if (bytes > available) {
749 // Read partial buffer
750 bytes = available;
751 }
752 memcpy(buffer, &buffer_[seek_position_], bytes);
753 seek_position_ += bytes;
754 if (bytes_read) {
755 *bytes_read = bytes;
756 }
757 return SR_SUCCESS;
758 }
759
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)760 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
761 size_t* bytes_written, int* error) {
762 size_t available = buffer_length_ - seek_position_;
763 if (0 == available) {
764 // Increase buffer size to the larger of:
765 // a) new position rounded up to next 256 bytes
766 // b) double the previous length
767 size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
768 buffer_length_ * 2);
769 StreamResult result = DoReserve(new_buffer_length, error);
770 if (SR_SUCCESS != result) {
771 return result;
772 }
773 ASSERT(buffer_length_ >= new_buffer_length);
774 available = buffer_length_ - seek_position_;
775 }
776
777 if (bytes > available) {
778 bytes = available;
779 }
780 memcpy(&buffer_[seek_position_], buffer, bytes);
781 seek_position_ += bytes;
782 if (data_length_ < seek_position_) {
783 data_length_ = seek_position_;
784 }
785 if (bytes_written) {
786 *bytes_written = bytes;
787 }
788 return SR_SUCCESS;
789 }
790
Close()791 void MemoryStreamBase::Close() {
792 // nothing to do
793 }
794
SetPosition(size_t position)795 bool MemoryStreamBase::SetPosition(size_t position) {
796 if (position > data_length_)
797 return false;
798 seek_position_ = position;
799 return true;
800 }
801
GetPosition(size_t * position) const802 bool MemoryStreamBase::GetPosition(size_t* position) const {
803 if (position)
804 *position = seek_position_;
805 return true;
806 }
807
GetSize(size_t * size) const808 bool MemoryStreamBase::GetSize(size_t* size) const {
809 if (size)
810 *size = data_length_;
811 return true;
812 }
813
GetAvailable(size_t * size) const814 bool MemoryStreamBase::GetAvailable(size_t* size) const {
815 if (size)
816 *size = data_length_ - seek_position_;
817 return true;
818 }
819
ReserveSize(size_t size)820 bool MemoryStreamBase::ReserveSize(size_t size) {
821 return (SR_SUCCESS == DoReserve(size, NULL));
822 }
823
DoReserve(size_t size,int * error)824 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
825 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
826 }
827
828 ///////////////////////////////////////////////////////////////////////////////
829
MemoryStream()830 MemoryStream::MemoryStream()
831 : buffer_alloc_(NULL) {
832 }
833
MemoryStream(const char * data)834 MemoryStream::MemoryStream(const char* data)
835 : buffer_alloc_(NULL) {
836 SetData(data, strlen(data));
837 }
838
MemoryStream(const void * data,size_t length)839 MemoryStream::MemoryStream(const void* data, size_t length)
840 : buffer_alloc_(NULL) {
841 SetData(data, length);
842 }
843
~MemoryStream()844 MemoryStream::~MemoryStream() {
845 delete [] buffer_alloc_;
846 }
847
SetData(const void * data,size_t length)848 void MemoryStream::SetData(const void* data, size_t length) {
849 data_length_ = buffer_length_ = length;
850 delete [] buffer_alloc_;
851 buffer_alloc_ = new char[buffer_length_ + kAlignment];
852 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
853 memcpy(buffer_, data, data_length_);
854 seek_position_ = 0;
855 }
856
DoReserve(size_t size,int * error)857 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
858 if (buffer_length_ >= size)
859 return SR_SUCCESS;
860
861 if (char* new_buffer_alloc = new char[size + kAlignment]) {
862 char* new_buffer = reinterpret_cast<char*>(
863 ALIGNP(new_buffer_alloc, kAlignment));
864 memcpy(new_buffer, buffer_, data_length_);
865 delete [] buffer_alloc_;
866 buffer_alloc_ = new_buffer_alloc;
867 buffer_ = new_buffer;
868 buffer_length_ = size;
869 return SR_SUCCESS;
870 }
871
872 if (error) {
873 *error = ENOMEM;
874 }
875 return SR_ERROR;
876 }
877
878 ///////////////////////////////////////////////////////////////////////////////
879
ExternalMemoryStream()880 ExternalMemoryStream::ExternalMemoryStream() {
881 }
882
ExternalMemoryStream(void * data,size_t length)883 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
884 SetData(data, length);
885 }
886
~ExternalMemoryStream()887 ExternalMemoryStream::~ExternalMemoryStream() {
888 }
889
SetData(void * data,size_t length)890 void ExternalMemoryStream::SetData(void* data, size_t length) {
891 data_length_ = buffer_length_ = length;
892 buffer_ = static_cast<char*>(data);
893 seek_position_ = 0;
894 }
895
896 ///////////////////////////////////////////////////////////////////////////////
897 // FifoBuffer
898 ///////////////////////////////////////////////////////////////////////////////
899
FifoBuffer(size_t size)900 FifoBuffer::FifoBuffer(size_t size)
901 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
902 data_length_(0), read_position_(0), owner_(Thread::Current()) {
903 // all events are done on the owner_ thread
904 }
905
FifoBuffer(size_t size,Thread * owner)906 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
907 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
908 data_length_(0), read_position_(0), owner_(owner) {
909 // all events are done on the owner_ thread
910 }
911
~FifoBuffer()912 FifoBuffer::~FifoBuffer() {
913 }
914
GetBuffered(size_t * size) const915 bool FifoBuffer::GetBuffered(size_t* size) const {
916 CritScope cs(&crit_);
917 *size = data_length_;
918 return true;
919 }
920
SetCapacity(size_t size)921 bool FifoBuffer::SetCapacity(size_t size) {
922 CritScope cs(&crit_);
923 if (data_length_ > size) {
924 return false;
925 }
926
927 if (size != buffer_length_) {
928 char* buffer = new char[size];
929 const size_t copy = data_length_;
930 const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
931 memcpy(buffer, &buffer_[read_position_], tail_copy);
932 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
933 buffer_.reset(buffer);
934 read_position_ = 0;
935 buffer_length_ = size;
936 }
937 return true;
938 }
939
ReadOffset(void * buffer,size_t bytes,size_t offset,size_t * bytes_read)940 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
941 size_t offset, size_t* bytes_read) {
942 CritScope cs(&crit_);
943 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
944 }
945
WriteOffset(const void * buffer,size_t bytes,size_t offset,size_t * bytes_written)946 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
947 size_t offset, size_t* bytes_written) {
948 CritScope cs(&crit_);
949 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
950 }
951
GetState() const952 StreamState FifoBuffer::GetState() const {
953 return state_;
954 }
955
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)956 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
957 size_t* bytes_read, int* error) {
958 CritScope cs(&crit_);
959 const bool was_writable = data_length_ < buffer_length_;
960 size_t copy = 0;
961 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©);
962
963 if (result == SR_SUCCESS) {
964 // If read was successful then adjust the read position and number of
965 // bytes buffered.
966 read_position_ = (read_position_ + copy) % buffer_length_;
967 data_length_ -= copy;
968 if (bytes_read) {
969 *bytes_read = copy;
970 }
971
972 // if we were full before, and now we're not, post an event
973 if (!was_writable && copy > 0) {
974 PostEvent(owner_, SE_WRITE, 0);
975 }
976 }
977 return result;
978 }
979
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)980 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
981 size_t* bytes_written, int* error) {
982 CritScope cs(&crit_);
983
984 const bool was_readable = (data_length_ > 0);
985 size_t copy = 0;
986 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©);
987
988 if (result == SR_SUCCESS) {
989 // If write was successful then adjust the number of readable bytes.
990 data_length_ += copy;
991 if (bytes_written) {
992 *bytes_written = copy;
993 }
994
995 // if we didn't have any data to read before, and now we do, post an event
996 if (!was_readable && copy > 0) {
997 PostEvent(owner_, SE_READ, 0);
998 }
999 }
1000 return result;
1001 }
1002
Close()1003 void FifoBuffer::Close() {
1004 CritScope cs(&crit_);
1005 state_ = SS_CLOSED;
1006 }
1007
GetReadData(size_t * size)1008 const void* FifoBuffer::GetReadData(size_t* size) {
1009 CritScope cs(&crit_);
1010 *size = (read_position_ + data_length_ <= buffer_length_) ?
1011 data_length_ : buffer_length_ - read_position_;
1012 return &buffer_[read_position_];
1013 }
1014
ConsumeReadData(size_t size)1015 void FifoBuffer::ConsumeReadData(size_t size) {
1016 CritScope cs(&crit_);
1017 ASSERT(size <= data_length_);
1018 const bool was_writable = data_length_ < buffer_length_;
1019 read_position_ = (read_position_ + size) % buffer_length_;
1020 data_length_ -= size;
1021 if (!was_writable && size > 0) {
1022 PostEvent(owner_, SE_WRITE, 0);
1023 }
1024 }
1025
GetWriteBuffer(size_t * size)1026 void* FifoBuffer::GetWriteBuffer(size_t* size) {
1027 CritScope cs(&crit_);
1028 if (state_ == SS_CLOSED) {
1029 return NULL;
1030 }
1031
1032 // if empty, reset the write position to the beginning, so we can get
1033 // the biggest possible block
1034 if (data_length_ == 0) {
1035 read_position_ = 0;
1036 }
1037
1038 const size_t write_position = (read_position_ + data_length_)
1039 % buffer_length_;
1040 *size = (write_position > read_position_ || data_length_ == 0) ?
1041 buffer_length_ - write_position : read_position_ - write_position;
1042 return &buffer_[write_position];
1043 }
1044
ConsumeWriteBuffer(size_t size)1045 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
1046 CritScope cs(&crit_);
1047 ASSERT(size <= buffer_length_ - data_length_);
1048 const bool was_readable = (data_length_ > 0);
1049 data_length_ += size;
1050 if (!was_readable && size > 0) {
1051 PostEvent(owner_, SE_READ, 0);
1052 }
1053 }
1054
GetWriteRemaining(size_t * size) const1055 bool FifoBuffer::GetWriteRemaining(size_t* size) const {
1056 CritScope cs(&crit_);
1057 *size = buffer_length_ - data_length_;
1058 return true;
1059 }
1060
ReadOffsetLocked(void * buffer,size_t bytes,size_t offset,size_t * bytes_read)1061 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
1062 size_t bytes,
1063 size_t offset,
1064 size_t* bytes_read) {
1065 if (offset >= data_length_) {
1066 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
1067 }
1068
1069 const size_t available = data_length_ - offset;
1070 const size_t read_position = (read_position_ + offset) % buffer_length_;
1071 const size_t copy = _min(bytes, available);
1072 const size_t tail_copy = _min(copy, buffer_length_ - read_position);
1073 char* const p = static_cast<char*>(buffer);
1074 memcpy(p, &buffer_[read_position], tail_copy);
1075 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
1076
1077 if (bytes_read) {
1078 *bytes_read = copy;
1079 }
1080 return SR_SUCCESS;
1081 }
1082
WriteOffsetLocked(const void * buffer,size_t bytes,size_t offset,size_t * bytes_written)1083 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
1084 size_t bytes,
1085 size_t offset,
1086 size_t* bytes_written) {
1087 if (state_ == SS_CLOSED) {
1088 return SR_EOS;
1089 }
1090
1091 if (data_length_ + offset >= buffer_length_) {
1092 return SR_BLOCK;
1093 }
1094
1095 const size_t available = buffer_length_ - data_length_ - offset;
1096 const size_t write_position = (read_position_ + data_length_ + offset)
1097 % buffer_length_;
1098 const size_t copy = _min(bytes, available);
1099 const size_t tail_copy = _min(copy, buffer_length_ - write_position);
1100 const char* const p = static_cast<const char*>(buffer);
1101 memcpy(&buffer_[write_position], p, tail_copy);
1102 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
1103
1104 if (bytes_written) {
1105 *bytes_written = copy;
1106 }
1107 return SR_SUCCESS;
1108 }
1109
1110
1111
1112 ///////////////////////////////////////////////////////////////////////////////
1113 // LoggingAdapter
1114 ///////////////////////////////////////////////////////////////////////////////
1115
LoggingAdapter(StreamInterface * stream,LoggingSeverity level,const std::string & label,bool hex_mode)1116 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
1117 const std::string& label, bool hex_mode)
1118 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
1119 set_label(label);
1120 }
1121
set_label(const std::string & label)1122 void LoggingAdapter::set_label(const std::string& label) {
1123 label_.assign("[");
1124 label_.append(label);
1125 label_.append("]");
1126 }
1127
Read(void * buffer,size_t buffer_len,size_t * read,int * error)1128 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
1129 size_t* read, int* error) {
1130 size_t local_read; if (!read) read = &local_read;
1131 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
1132 error);
1133 if (result == SR_SUCCESS) {
1134 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
1135 }
1136 return result;
1137 }
1138
Write(const void * data,size_t data_len,size_t * written,int * error)1139 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
1140 size_t* written, int* error) {
1141 size_t local_written;
1142 if (!written) written = &local_written;
1143 StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
1144 error);
1145 if (result == SR_SUCCESS) {
1146 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
1147 &lms_);
1148 }
1149 return result;
1150 }
1151
Close()1152 void LoggingAdapter::Close() {
1153 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1154 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1155 LOG_V(level_) << label_ << " Closed locally";
1156 StreamAdapterInterface::Close();
1157 }
1158
OnEvent(StreamInterface * stream,int events,int err)1159 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
1160 if (events & SE_OPEN) {
1161 LOG_V(level_) << label_ << " Open";
1162 } else if (events & SE_CLOSE) {
1163 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1164 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1165 LOG_V(level_) << label_ << " Closed with error: " << err;
1166 }
1167 StreamAdapterInterface::OnEvent(stream, events, err);
1168 }
1169
1170 ///////////////////////////////////////////////////////////////////////////////
1171 // StringStream - Reads/Writes to an external std::string
1172 ///////////////////////////////////////////////////////////////////////////////
1173
StringStream(std::string & str)1174 StringStream::StringStream(std::string& str)
1175 : str_(str), read_pos_(0), read_only_(false) {
1176 }
1177
StringStream(const std::string & str)1178 StringStream::StringStream(const std::string& str)
1179 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
1180 }
1181
GetState() const1182 StreamState StringStream::GetState() const {
1183 return SS_OPEN;
1184 }
1185
Read(void * buffer,size_t buffer_len,size_t * read,int * error)1186 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
1187 size_t* read, int* error) {
1188 size_t available = _min(buffer_len, str_.size() - read_pos_);
1189 if (!available)
1190 return SR_EOS;
1191 memcpy(buffer, str_.data() + read_pos_, available);
1192 read_pos_ += available;
1193 if (read)
1194 *read = available;
1195 return SR_SUCCESS;
1196 }
1197
Write(const void * data,size_t data_len,size_t * written,int * error)1198 StreamResult StringStream::Write(const void* data, size_t data_len,
1199 size_t* written, int* error) {
1200 if (read_only_) {
1201 if (error) {
1202 *error = -1;
1203 }
1204 return SR_ERROR;
1205 }
1206 str_.append(static_cast<const char*>(data),
1207 static_cast<const char*>(data) + data_len);
1208 if (written)
1209 *written = data_len;
1210 return SR_SUCCESS;
1211 }
1212
Close()1213 void StringStream::Close() {
1214 }
1215
SetPosition(size_t position)1216 bool StringStream::SetPosition(size_t position) {
1217 if (position > str_.size())
1218 return false;
1219 read_pos_ = position;
1220 return true;
1221 }
1222
GetPosition(size_t * position) const1223 bool StringStream::GetPosition(size_t* position) const {
1224 if (position)
1225 *position = read_pos_;
1226 return true;
1227 }
1228
GetSize(size_t * size) const1229 bool StringStream::GetSize(size_t* size) const {
1230 if (size)
1231 *size = str_.size();
1232 return true;
1233 }
1234
GetAvailable(size_t * size) const1235 bool StringStream::GetAvailable(size_t* size) const {
1236 if (size)
1237 *size = str_.size() - read_pos_;
1238 return true;
1239 }
1240
ReserveSize(size_t size)1241 bool StringStream::ReserveSize(size_t size) {
1242 if (read_only_)
1243 return false;
1244 str_.reserve(size);
1245 return true;
1246 }
1247
1248 ///////////////////////////////////////////////////////////////////////////////
1249 // StreamReference
1250 ///////////////////////////////////////////////////////////////////////////////
1251
StreamReference(StreamInterface * stream)1252 StreamReference::StreamReference(StreamInterface* stream)
1253 : StreamAdapterInterface(stream, false) {
1254 // owner set to false so the destructor does not free the stream.
1255 stream_ref_count_ = new StreamRefCount(stream);
1256 }
1257
NewReference()1258 StreamInterface* StreamReference::NewReference() {
1259 stream_ref_count_->AddReference();
1260 return new StreamReference(stream_ref_count_, stream());
1261 }
1262
~StreamReference()1263 StreamReference::~StreamReference() {
1264 stream_ref_count_->Release();
1265 }
1266
StreamReference(StreamRefCount * stream_ref_count,StreamInterface * stream)1267 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1268 StreamInterface* stream)
1269 : StreamAdapterInterface(stream, false),
1270 stream_ref_count_(stream_ref_count) {
1271 }
1272
1273 ///////////////////////////////////////////////////////////////////////////////
1274
Flow(StreamInterface * source,char * buffer,size_t buffer_len,StreamInterface * sink,size_t * data_len)1275 StreamResult Flow(StreamInterface* source,
1276 char* buffer, size_t buffer_len,
1277 StreamInterface* sink,
1278 size_t* data_len /* = NULL */) {
1279 ASSERT(buffer_len > 0);
1280
1281 StreamResult result;
1282 size_t count, read_pos, write_pos;
1283 if (data_len) {
1284 read_pos = *data_len;
1285 } else {
1286 read_pos = 0;
1287 }
1288
1289 bool end_of_stream = false;
1290 do {
1291 // Read until buffer is full, end of stream, or error
1292 while (!end_of_stream && (read_pos < buffer_len)) {
1293 result = source->Read(buffer + read_pos, buffer_len - read_pos,
1294 &count, NULL);
1295 if (result == SR_EOS) {
1296 end_of_stream = true;
1297 } else if (result != SR_SUCCESS) {
1298 if (data_len) {
1299 *data_len = read_pos;
1300 }
1301 return result;
1302 } else {
1303 read_pos += count;
1304 }
1305 }
1306
1307 // Write until buffer is empty, or error (including end of stream)
1308 write_pos = 0;
1309 while (write_pos < read_pos) {
1310 result = sink->Write(buffer + write_pos, read_pos - write_pos,
1311 &count, NULL);
1312 if (result != SR_SUCCESS) {
1313 if (data_len) {
1314 *data_len = read_pos - write_pos;
1315 if (write_pos > 0) {
1316 memmove(buffer, buffer + write_pos, *data_len);
1317 }
1318 }
1319 return result;
1320 }
1321 write_pos += count;
1322 }
1323
1324 read_pos = 0;
1325 } while (!end_of_stream);
1326
1327 if (data_len) {
1328 *data_len = 0;
1329 }
1330 return SR_SUCCESS;
1331 }
1332
1333 ///////////////////////////////////////////////////////////////////////////////
1334
1335 } // namespace rtc
1336