1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #if defined(WEBRTC_POSIX)
12 #include <sys/file.h>
13 #endif // WEBRTC_POSIX
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <errno.h>
17
18 #include <algorithm>
19 #include <string>
20
21 #include "webrtc/base/basictypes.h"
22 #include "webrtc/base/common.h"
23 #include "webrtc/base/logging.h"
24 #include "webrtc/base/messagequeue.h"
25 #include "webrtc/base/stream.h"
26 #include "webrtc/base/stringencode.h"
27 #include "webrtc/base/stringutils.h"
28 #include "webrtc/base/thread.h"
29 #include "webrtc/base/timeutils.h"
30
31 #if defined(WEBRTC_WIN)
32 #include "webrtc/base/win32.h"
33 #define fileno _fileno
34 #endif
35
36 namespace rtc {
37
38 ///////////////////////////////////////////////////////////////////////////////
39 // StreamInterface
40 ///////////////////////////////////////////////////////////////////////////////
~StreamInterface()41 StreamInterface::~StreamInterface() {
42 }
43
WriteAll(const void * data,size_t data_len,size_t * written,int * error)44 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
45 size_t* written, int* error) {
46 StreamResult result = SR_SUCCESS;
47 size_t total_written = 0, current_written;
48 while (total_written < data_len) {
49 result = Write(static_cast<const char*>(data) + total_written,
50 data_len - total_written, ¤t_written, error);
51 if (result != SR_SUCCESS)
52 break;
53 total_written += current_written;
54 }
55 if (written)
56 *written = total_written;
57 return result;
58 }
59
ReadAll(void * buffer,size_t buffer_len,size_t * read,int * error)60 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
61 size_t* read, int* error) {
62 StreamResult result = SR_SUCCESS;
63 size_t total_read = 0, current_read;
64 while (total_read < buffer_len) {
65 result = Read(static_cast<char*>(buffer) + total_read,
66 buffer_len - total_read, ¤t_read, error);
67 if (result != SR_SUCCESS)
68 break;
69 total_read += current_read;
70 }
71 if (read)
72 *read = total_read;
73 return result;
74 }
75
ReadLine(std::string * line)76 StreamResult StreamInterface::ReadLine(std::string* line) {
77 line->clear();
78 StreamResult result = SR_SUCCESS;
79 while (true) {
80 char ch;
81 result = Read(&ch, sizeof(ch), NULL, NULL);
82 if (result != SR_SUCCESS) {
83 break;
84 }
85 if (ch == '\n') {
86 break;
87 }
88 line->push_back(ch);
89 }
90 if (!line->empty()) { // give back the line we've collected so far with
91 result = SR_SUCCESS; // a success code. Otherwise return the last code
92 }
93 return result;
94 }
95
PostEvent(Thread * t,int events,int err)96 void StreamInterface::PostEvent(Thread* t, int events, int err) {
97 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
98 }
99
PostEvent(int events,int err)100 void StreamInterface::PostEvent(int events, int err) {
101 PostEvent(Thread::Current(), events, err);
102 }
103
GetReadData(size_t * data_len)104 const void* StreamInterface::GetReadData(size_t* data_len) {
105 return NULL;
106 }
107
GetWriteBuffer(size_t * buf_len)108 void* StreamInterface::GetWriteBuffer(size_t* buf_len) {
109 return NULL;
110 }
111
SetPosition(size_t position)112 bool StreamInterface::SetPosition(size_t position) {
113 return false;
114 }
115
GetPosition(size_t * position) const116 bool StreamInterface::GetPosition(size_t* position) const {
117 return false;
118 }
119
GetSize(size_t * size) const120 bool StreamInterface::GetSize(size_t* size) const {
121 return false;
122 }
123
GetAvailable(size_t * size) const124 bool StreamInterface::GetAvailable(size_t* size) const {
125 return false;
126 }
127
GetWriteRemaining(size_t * size) const128 bool StreamInterface::GetWriteRemaining(size_t* size) const {
129 return false;
130 }
131
Flush()132 bool StreamInterface::Flush() {
133 return false;
134 }
135
ReserveSize(size_t size)136 bool StreamInterface::ReserveSize(size_t size) {
137 return true;
138 }
139
StreamInterface()140 StreamInterface::StreamInterface() {
141 }
142
OnMessage(Message * msg)143 void StreamInterface::OnMessage(Message* msg) {
144 if (MSG_POST_EVENT == msg->message_id) {
145 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
146 SignalEvent(this, pe->events, pe->error);
147 delete msg->pdata;
148 }
149 }
150
151 ///////////////////////////////////////////////////////////////////////////////
152 // StreamAdapterInterface
153 ///////////////////////////////////////////////////////////////////////////////
154
StreamAdapterInterface(StreamInterface * stream,bool owned)155 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
156 bool owned)
157 : stream_(stream), owned_(owned) {
158 if (NULL != stream_)
159 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
160 }
161
GetState() const162 StreamState StreamAdapterInterface::GetState() const {
163 return stream_->GetState();
164 }
Read(void * buffer,size_t buffer_len,size_t * read,int * error)165 StreamResult StreamAdapterInterface::Read(void* buffer,
166 size_t buffer_len,
167 size_t* read,
168 int* error) {
169 return stream_->Read(buffer, buffer_len, read, error);
170 }
Write(const void * data,size_t data_len,size_t * written,int * error)171 StreamResult StreamAdapterInterface::Write(const void* data,
172 size_t data_len,
173 size_t* written,
174 int* error) {
175 return stream_->Write(data, data_len, written, error);
176 }
Close()177 void StreamAdapterInterface::Close() {
178 stream_->Close();
179 }
180
SetPosition(size_t position)181 bool StreamAdapterInterface::SetPosition(size_t position) {
182 return stream_->SetPosition(position);
183 }
184
GetPosition(size_t * position) const185 bool StreamAdapterInterface::GetPosition(size_t* position) const {
186 return stream_->GetPosition(position);
187 }
188
GetSize(size_t * size) const189 bool StreamAdapterInterface::GetSize(size_t* size) const {
190 return stream_->GetSize(size);
191 }
192
GetAvailable(size_t * size) const193 bool StreamAdapterInterface::GetAvailable(size_t* size) const {
194 return stream_->GetAvailable(size);
195 }
196
GetWriteRemaining(size_t * size) const197 bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const {
198 return stream_->GetWriteRemaining(size);
199 }
200
ReserveSize(size_t size)201 bool StreamAdapterInterface::ReserveSize(size_t size) {
202 return stream_->ReserveSize(size);
203 }
204
Flush()205 bool StreamAdapterInterface::Flush() {
206 return stream_->Flush();
207 }
208
Attach(StreamInterface * stream,bool owned)209 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
210 if (NULL != stream_)
211 stream_->SignalEvent.disconnect(this);
212 if (owned_)
213 delete stream_;
214 stream_ = stream;
215 owned_ = owned;
216 if (NULL != stream_)
217 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
218 }
219
Detach()220 StreamInterface* StreamAdapterInterface::Detach() {
221 if (NULL != stream_)
222 stream_->SignalEvent.disconnect(this);
223 StreamInterface* stream = stream_;
224 stream_ = NULL;
225 return stream;
226 }
227
~StreamAdapterInterface()228 StreamAdapterInterface::~StreamAdapterInterface() {
229 if (owned_)
230 delete stream_;
231 }
232
OnEvent(StreamInterface * stream,int events,int err)233 void StreamAdapterInterface::OnEvent(StreamInterface* stream,
234 int events,
235 int err) {
236 SignalEvent(this, events, err);
237 }
238
239 ///////////////////////////////////////////////////////////////////////////////
240 // StreamTap
241 ///////////////////////////////////////////////////////////////////////////////
242
StreamTap(StreamInterface * stream,StreamInterface * tap)243 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
244 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
245 tap_error_(0) {
246 AttachTap(tap);
247 }
248
249 StreamTap::~StreamTap() = default;
250
AttachTap(StreamInterface * tap)251 void StreamTap::AttachTap(StreamInterface* tap) {
252 tap_.reset(tap);
253 }
254
DetachTap()255 StreamInterface* StreamTap::DetachTap() {
256 return tap_.release();
257 }
258
GetTapResult(int * error)259 StreamResult StreamTap::GetTapResult(int* error) {
260 if (error) {
261 *error = tap_error_;
262 }
263 return tap_result_;
264 }
265
Read(void * buffer,size_t buffer_len,size_t * read,int * error)266 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
267 size_t* read, int* error) {
268 size_t backup_read;
269 if (!read) {
270 read = &backup_read;
271 }
272 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
273 read, error);
274 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
275 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
276 }
277 return res;
278 }
279
Write(const void * data,size_t data_len,size_t * written,int * error)280 StreamResult StreamTap::Write(const void* data, size_t data_len,
281 size_t* written, int* error) {
282 size_t backup_written;
283 if (!written) {
284 written = &backup_written;
285 }
286 StreamResult res = StreamAdapterInterface::Write(data, data_len,
287 written, error);
288 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
289 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
290 }
291 return res;
292 }
293
294 ///////////////////////////////////////////////////////////////////////////////
295 // NullStream
296 ///////////////////////////////////////////////////////////////////////////////
297
NullStream()298 NullStream::NullStream() {
299 }
300
~NullStream()301 NullStream::~NullStream() {
302 }
303
GetState() const304 StreamState NullStream::GetState() const {
305 return SS_OPEN;
306 }
307
Read(void * buffer,size_t buffer_len,size_t * read,int * error)308 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
309 size_t* read, int* error) {
310 if (error) *error = -1;
311 return SR_ERROR;
312 }
313
Write(const void * data,size_t data_len,size_t * written,int * error)314 StreamResult NullStream::Write(const void* data, size_t data_len,
315 size_t* written, int* error) {
316 if (written) *written = data_len;
317 return SR_SUCCESS;
318 }
319
Close()320 void NullStream::Close() {
321 }
322
323 ///////////////////////////////////////////////////////////////////////////////
324 // FileStream
325 ///////////////////////////////////////////////////////////////////////////////
326
FileStream()327 FileStream::FileStream() : file_(NULL) {
328 }
329
~FileStream()330 FileStream::~FileStream() {
331 FileStream::Close();
332 }
333
Open(const std::string & filename,const char * mode,int * error)334 bool FileStream::Open(const std::string& filename, const char* mode,
335 int* error) {
336 Close();
337 #if defined(WEBRTC_WIN)
338 std::wstring wfilename;
339 if (Utf8ToWindowsFilename(filename, &wfilename)) {
340 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
341 } else {
342 if (error) {
343 *error = -1;
344 return false;
345 }
346 }
347 #else
348 file_ = fopen(filename.c_str(), mode);
349 #endif
350 if (!file_ && error) {
351 *error = errno;
352 }
353 return (file_ != NULL);
354 }
355
OpenShare(const std::string & filename,const char * mode,int shflag,int * error)356 bool FileStream::OpenShare(const std::string& filename, const char* mode,
357 int shflag, int* error) {
358 Close();
359 #if defined(WEBRTC_WIN)
360 std::wstring wfilename;
361 if (Utf8ToWindowsFilename(filename, &wfilename)) {
362 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
363 if (!file_ && error) {
364 *error = errno;
365 return false;
366 }
367 return file_ != NULL;
368 } else {
369 if (error) {
370 *error = -1;
371 }
372 return false;
373 }
374 #else
375 return Open(filename, mode, error);
376 #endif
377 }
378
DisableBuffering()379 bool FileStream::DisableBuffering() {
380 if (!file_)
381 return false;
382 return (setvbuf(file_, NULL, _IONBF, 0) == 0);
383 }
384
GetState() const385 StreamState FileStream::GetState() const {
386 return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
387 }
388
Read(void * buffer,size_t buffer_len,size_t * read,int * error)389 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
390 size_t* read, int* error) {
391 if (!file_)
392 return SR_EOS;
393 size_t result = fread(buffer, 1, buffer_len, file_);
394 if ((result == 0) && (buffer_len > 0)) {
395 if (feof(file_))
396 return SR_EOS;
397 if (error)
398 *error = errno;
399 return SR_ERROR;
400 }
401 if (read)
402 *read = result;
403 return SR_SUCCESS;
404 }
405
Write(const void * data,size_t data_len,size_t * written,int * error)406 StreamResult FileStream::Write(const void* data, size_t data_len,
407 size_t* written, int* error) {
408 if (!file_)
409 return SR_EOS;
410 size_t result = fwrite(data, 1, data_len, file_);
411 if ((result == 0) && (data_len > 0)) {
412 if (error)
413 *error = errno;
414 return SR_ERROR;
415 }
416 if (written)
417 *written = result;
418 return SR_SUCCESS;
419 }
420
Close()421 void FileStream::Close() {
422 if (file_) {
423 DoClose();
424 file_ = NULL;
425 }
426 }
427
SetPosition(size_t position)428 bool FileStream::SetPosition(size_t position) {
429 if (!file_)
430 return false;
431 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
432 }
433
GetPosition(size_t * position) const434 bool FileStream::GetPosition(size_t* position) const {
435 ASSERT(NULL != position);
436 if (!file_)
437 return false;
438 long result = ftell(file_);
439 if (result < 0)
440 return false;
441 if (position)
442 *position = result;
443 return true;
444 }
445
GetSize(size_t * size) const446 bool FileStream::GetSize(size_t* size) const {
447 ASSERT(NULL != size);
448 if (!file_)
449 return false;
450 struct stat file_stats;
451 if (fstat(fileno(file_), &file_stats) != 0)
452 return false;
453 if (size)
454 *size = file_stats.st_size;
455 return true;
456 }
457
GetAvailable(size_t * size) const458 bool FileStream::GetAvailable(size_t* size) const {
459 ASSERT(NULL != size);
460 if (!GetSize(size))
461 return false;
462 long result = ftell(file_);
463 if (result < 0)
464 return false;
465 if (size)
466 *size -= result;
467 return true;
468 }
469
ReserveSize(size_t size)470 bool FileStream::ReserveSize(size_t size) {
471 // TODO: extend the file to the proper length
472 return true;
473 }
474
GetSize(const std::string & filename,size_t * size)475 bool FileStream::GetSize(const std::string& filename, size_t* size) {
476 struct stat file_stats;
477 if (stat(filename.c_str(), &file_stats) != 0)
478 return false;
479 *size = file_stats.st_size;
480 return true;
481 }
482
Flush()483 bool FileStream::Flush() {
484 if (file_) {
485 return (0 == fflush(file_));
486 }
487 // try to flush empty file?
488 ASSERT(false);
489 return false;
490 }
491
492 #if defined(WEBRTC_POSIX) && !defined(__native_client__)
493
TryLock()494 bool FileStream::TryLock() {
495 if (file_ == NULL) {
496 // Stream not open.
497 ASSERT(false);
498 return false;
499 }
500
501 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
502 }
503
Unlock()504 bool FileStream::Unlock() {
505 if (file_ == NULL) {
506 // Stream not open.
507 ASSERT(false);
508 return false;
509 }
510
511 return flock(fileno(file_), LOCK_UN) == 0;
512 }
513
514 #endif
515
DoClose()516 void FileStream::DoClose() {
517 fclose(file_);
518 }
519
520 ///////////////////////////////////////////////////////////////////////////////
521 // MemoryStream
522 ///////////////////////////////////////////////////////////////////////////////
523
MemoryStreamBase()524 MemoryStreamBase::MemoryStreamBase()
525 : buffer_(NULL), buffer_length_(0), data_length_(0),
526 seek_position_(0) {
527 }
528
GetState() const529 StreamState MemoryStreamBase::GetState() const {
530 return SS_OPEN;
531 }
532
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)533 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
534 size_t* bytes_read, int* error) {
535 if (seek_position_ >= data_length_) {
536 return SR_EOS;
537 }
538 size_t available = data_length_ - seek_position_;
539 if (bytes > available) {
540 // Read partial buffer
541 bytes = available;
542 }
543 memcpy(buffer, &buffer_[seek_position_], bytes);
544 seek_position_ += bytes;
545 if (bytes_read) {
546 *bytes_read = bytes;
547 }
548 return SR_SUCCESS;
549 }
550
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)551 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
552 size_t* bytes_written, int* error) {
553 size_t available = buffer_length_ - seek_position_;
554 if (0 == available) {
555 // Increase buffer size to the larger of:
556 // a) new position rounded up to next 256 bytes
557 // b) double the previous length
558 size_t new_buffer_length =
559 std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2);
560 StreamResult result = DoReserve(new_buffer_length, error);
561 if (SR_SUCCESS != result) {
562 return result;
563 }
564 ASSERT(buffer_length_ >= new_buffer_length);
565 available = buffer_length_ - seek_position_;
566 }
567
568 if (bytes > available) {
569 bytes = available;
570 }
571 memcpy(&buffer_[seek_position_], buffer, bytes);
572 seek_position_ += bytes;
573 if (data_length_ < seek_position_) {
574 data_length_ = seek_position_;
575 }
576 if (bytes_written) {
577 *bytes_written = bytes;
578 }
579 return SR_SUCCESS;
580 }
581
Close()582 void MemoryStreamBase::Close() {
583 // nothing to do
584 }
585
SetPosition(size_t position)586 bool MemoryStreamBase::SetPosition(size_t position) {
587 if (position > data_length_)
588 return false;
589 seek_position_ = position;
590 return true;
591 }
592
GetPosition(size_t * position) const593 bool MemoryStreamBase::GetPosition(size_t* position) const {
594 if (position)
595 *position = seek_position_;
596 return true;
597 }
598
GetSize(size_t * size) const599 bool MemoryStreamBase::GetSize(size_t* size) const {
600 if (size)
601 *size = data_length_;
602 return true;
603 }
604
GetAvailable(size_t * size) const605 bool MemoryStreamBase::GetAvailable(size_t* size) const {
606 if (size)
607 *size = data_length_ - seek_position_;
608 return true;
609 }
610
ReserveSize(size_t size)611 bool MemoryStreamBase::ReserveSize(size_t size) {
612 return (SR_SUCCESS == DoReserve(size, NULL));
613 }
614
DoReserve(size_t size,int * error)615 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
616 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
617 }
618
619 ///////////////////////////////////////////////////////////////////////////////
620
MemoryStream()621 MemoryStream::MemoryStream()
622 : buffer_alloc_(NULL) {
623 }
624
MemoryStream(const char * data)625 MemoryStream::MemoryStream(const char* data)
626 : buffer_alloc_(NULL) {
627 SetData(data, strlen(data));
628 }
629
MemoryStream(const void * data,size_t length)630 MemoryStream::MemoryStream(const void* data, size_t length)
631 : buffer_alloc_(NULL) {
632 SetData(data, length);
633 }
634
~MemoryStream()635 MemoryStream::~MemoryStream() {
636 delete [] buffer_alloc_;
637 }
638
SetData(const void * data,size_t length)639 void MemoryStream::SetData(const void* data, size_t length) {
640 data_length_ = buffer_length_ = length;
641 delete [] buffer_alloc_;
642 buffer_alloc_ = new char[buffer_length_ + kAlignment];
643 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
644 memcpy(buffer_, data, data_length_);
645 seek_position_ = 0;
646 }
647
DoReserve(size_t size,int * error)648 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
649 if (buffer_length_ >= size)
650 return SR_SUCCESS;
651
652 if (char* new_buffer_alloc = new char[size + kAlignment]) {
653 char* new_buffer = reinterpret_cast<char*>(
654 ALIGNP(new_buffer_alloc, kAlignment));
655 memcpy(new_buffer, buffer_, data_length_);
656 delete [] buffer_alloc_;
657 buffer_alloc_ = new_buffer_alloc;
658 buffer_ = new_buffer;
659 buffer_length_ = size;
660 return SR_SUCCESS;
661 }
662
663 if (error) {
664 *error = ENOMEM;
665 }
666 return SR_ERROR;
667 }
668
669 ///////////////////////////////////////////////////////////////////////////////
670
ExternalMemoryStream()671 ExternalMemoryStream::ExternalMemoryStream() {
672 }
673
ExternalMemoryStream(void * data,size_t length)674 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
675 SetData(data, length);
676 }
677
~ExternalMemoryStream()678 ExternalMemoryStream::~ExternalMemoryStream() {
679 }
680
SetData(void * data,size_t length)681 void ExternalMemoryStream::SetData(void* data, size_t length) {
682 data_length_ = buffer_length_ = length;
683 buffer_ = static_cast<char*>(data);
684 seek_position_ = 0;
685 }
686
687 ///////////////////////////////////////////////////////////////////////////////
688 // FifoBuffer
689 ///////////////////////////////////////////////////////////////////////////////
690
FifoBuffer(size_t size)691 FifoBuffer::FifoBuffer(size_t size)
692 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
693 data_length_(0), read_position_(0), owner_(Thread::Current()) {
694 // all events are done on the owner_ thread
695 }
696
FifoBuffer(size_t size,Thread * owner)697 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
698 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
699 data_length_(0), read_position_(0), owner_(owner) {
700 // all events are done on the owner_ thread
701 }
702
~FifoBuffer()703 FifoBuffer::~FifoBuffer() {
704 }
705
GetBuffered(size_t * size) const706 bool FifoBuffer::GetBuffered(size_t* size) const {
707 CritScope cs(&crit_);
708 *size = data_length_;
709 return true;
710 }
711
SetCapacity(size_t size)712 bool FifoBuffer::SetCapacity(size_t size) {
713 CritScope cs(&crit_);
714 if (data_length_ > size) {
715 return false;
716 }
717
718 if (size != buffer_length_) {
719 char* buffer = new char[size];
720 const size_t copy = data_length_;
721 const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
722 memcpy(buffer, &buffer_[read_position_], tail_copy);
723 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
724 buffer_.reset(buffer);
725 read_position_ = 0;
726 buffer_length_ = size;
727 }
728 return true;
729 }
730
ReadOffset(void * buffer,size_t bytes,size_t offset,size_t * bytes_read)731 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
732 size_t offset, size_t* bytes_read) {
733 CritScope cs(&crit_);
734 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
735 }
736
WriteOffset(const void * buffer,size_t bytes,size_t offset,size_t * bytes_written)737 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
738 size_t offset, size_t* bytes_written) {
739 CritScope cs(&crit_);
740 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
741 }
742
GetState() const743 StreamState FifoBuffer::GetState() const {
744 return state_;
745 }
746
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)747 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
748 size_t* bytes_read, int* error) {
749 CritScope cs(&crit_);
750 const bool was_writable = data_length_ < buffer_length_;
751 size_t copy = 0;
752 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©);
753
754 if (result == SR_SUCCESS) {
755 // If read was successful then adjust the read position and number of
756 // bytes buffered.
757 read_position_ = (read_position_ + copy) % buffer_length_;
758 data_length_ -= copy;
759 if (bytes_read) {
760 *bytes_read = copy;
761 }
762
763 // if we were full before, and now we're not, post an event
764 if (!was_writable && copy > 0) {
765 PostEvent(owner_, SE_WRITE, 0);
766 }
767 }
768 return result;
769 }
770
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)771 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
772 size_t* bytes_written, int* error) {
773 CritScope cs(&crit_);
774
775 const bool was_readable = (data_length_ > 0);
776 size_t copy = 0;
777 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©);
778
779 if (result == SR_SUCCESS) {
780 // If write was successful then adjust the number of readable bytes.
781 data_length_ += copy;
782 if (bytes_written) {
783 *bytes_written = copy;
784 }
785
786 // if we didn't have any data to read before, and now we do, post an event
787 if (!was_readable && copy > 0) {
788 PostEvent(owner_, SE_READ, 0);
789 }
790 }
791 return result;
792 }
793
Close()794 void FifoBuffer::Close() {
795 CritScope cs(&crit_);
796 state_ = SS_CLOSED;
797 }
798
GetReadData(size_t * size)799 const void* FifoBuffer::GetReadData(size_t* size) {
800 CritScope cs(&crit_);
801 *size = (read_position_ + data_length_ <= buffer_length_) ?
802 data_length_ : buffer_length_ - read_position_;
803 return &buffer_[read_position_];
804 }
805
ConsumeReadData(size_t size)806 void FifoBuffer::ConsumeReadData(size_t size) {
807 CritScope cs(&crit_);
808 ASSERT(size <= data_length_);
809 const bool was_writable = data_length_ < buffer_length_;
810 read_position_ = (read_position_ + size) % buffer_length_;
811 data_length_ -= size;
812 if (!was_writable && size > 0) {
813 PostEvent(owner_, SE_WRITE, 0);
814 }
815 }
816
GetWriteBuffer(size_t * size)817 void* FifoBuffer::GetWriteBuffer(size_t* size) {
818 CritScope cs(&crit_);
819 if (state_ == SS_CLOSED) {
820 return NULL;
821 }
822
823 // if empty, reset the write position to the beginning, so we can get
824 // the biggest possible block
825 if (data_length_ == 0) {
826 read_position_ = 0;
827 }
828
829 const size_t write_position = (read_position_ + data_length_)
830 % buffer_length_;
831 *size = (write_position > read_position_ || data_length_ == 0) ?
832 buffer_length_ - write_position : read_position_ - write_position;
833 return &buffer_[write_position];
834 }
835
ConsumeWriteBuffer(size_t size)836 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
837 CritScope cs(&crit_);
838 ASSERT(size <= buffer_length_ - data_length_);
839 const bool was_readable = (data_length_ > 0);
840 data_length_ += size;
841 if (!was_readable && size > 0) {
842 PostEvent(owner_, SE_READ, 0);
843 }
844 }
845
GetWriteRemaining(size_t * size) const846 bool FifoBuffer::GetWriteRemaining(size_t* size) const {
847 CritScope cs(&crit_);
848 *size = buffer_length_ - data_length_;
849 return true;
850 }
851
ReadOffsetLocked(void * buffer,size_t bytes,size_t offset,size_t * bytes_read)852 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
853 size_t bytes,
854 size_t offset,
855 size_t* bytes_read) {
856 if (offset >= data_length_) {
857 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
858 }
859
860 const size_t available = data_length_ - offset;
861 const size_t read_position = (read_position_ + offset) % buffer_length_;
862 const size_t copy = std::min(bytes, available);
863 const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
864 char* const p = static_cast<char*>(buffer);
865 memcpy(p, &buffer_[read_position], tail_copy);
866 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
867
868 if (bytes_read) {
869 *bytes_read = copy;
870 }
871 return SR_SUCCESS;
872 }
873
WriteOffsetLocked(const void * buffer,size_t bytes,size_t offset,size_t * bytes_written)874 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
875 size_t bytes,
876 size_t offset,
877 size_t* bytes_written) {
878 if (state_ == SS_CLOSED) {
879 return SR_EOS;
880 }
881
882 if (data_length_ + offset >= buffer_length_) {
883 return SR_BLOCK;
884 }
885
886 const size_t available = buffer_length_ - data_length_ - offset;
887 const size_t write_position = (read_position_ + data_length_ + offset)
888 % buffer_length_;
889 const size_t copy = std::min(bytes, available);
890 const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
891 const char* const p = static_cast<const char*>(buffer);
892 memcpy(&buffer_[write_position], p, tail_copy);
893 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
894
895 if (bytes_written) {
896 *bytes_written = copy;
897 }
898 return SR_SUCCESS;
899 }
900
901
902
903 ///////////////////////////////////////////////////////////////////////////////
904 // LoggingAdapter
905 ///////////////////////////////////////////////////////////////////////////////
906
LoggingAdapter(StreamInterface * stream,LoggingSeverity level,const std::string & label,bool hex_mode)907 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
908 const std::string& label, bool hex_mode)
909 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
910 set_label(label);
911 }
912
set_label(const std::string & label)913 void LoggingAdapter::set_label(const std::string& label) {
914 label_.assign("[");
915 label_.append(label);
916 label_.append("]");
917 }
918
Read(void * buffer,size_t buffer_len,size_t * read,int * error)919 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
920 size_t* read, int* error) {
921 size_t local_read; if (!read) read = &local_read;
922 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
923 error);
924 if (result == SR_SUCCESS) {
925 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
926 }
927 return result;
928 }
929
Write(const void * data,size_t data_len,size_t * written,int * error)930 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
931 size_t* written, int* error) {
932 size_t local_written;
933 if (!written) written = &local_written;
934 StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
935 error);
936 if (result == SR_SUCCESS) {
937 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
938 &lms_);
939 }
940 return result;
941 }
942
Close()943 void LoggingAdapter::Close() {
944 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
945 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
946 LOG_V(level_) << label_ << " Closed locally";
947 StreamAdapterInterface::Close();
948 }
949
OnEvent(StreamInterface * stream,int events,int err)950 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
951 if (events & SE_OPEN) {
952 LOG_V(level_) << label_ << " Open";
953 } else if (events & SE_CLOSE) {
954 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
955 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
956 LOG_V(level_) << label_ << " Closed with error: " << err;
957 }
958 StreamAdapterInterface::OnEvent(stream, events, err);
959 }
960
961 ///////////////////////////////////////////////////////////////////////////////
962 // StringStream - Reads/Writes to an external std::string
963 ///////////////////////////////////////////////////////////////////////////////
964
StringStream(std::string * str)965 StringStream::StringStream(std::string* str)
966 : str_(*str), read_pos_(0), read_only_(false) {
967 }
968
StringStream(const std::string & str)969 StringStream::StringStream(const std::string& str)
970 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
971 }
972
GetState() const973 StreamState StringStream::GetState() const {
974 return SS_OPEN;
975 }
976
Read(void * buffer,size_t buffer_len,size_t * read,int * error)977 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
978 size_t* read, int* error) {
979 size_t available = std::min(buffer_len, str_.size() - read_pos_);
980 if (!available)
981 return SR_EOS;
982 memcpy(buffer, str_.data() + read_pos_, available);
983 read_pos_ += available;
984 if (read)
985 *read = available;
986 return SR_SUCCESS;
987 }
988
Write(const void * data,size_t data_len,size_t * written,int * error)989 StreamResult StringStream::Write(const void* data, size_t data_len,
990 size_t* written, int* error) {
991 if (read_only_) {
992 if (error) {
993 *error = -1;
994 }
995 return SR_ERROR;
996 }
997 str_.append(static_cast<const char*>(data),
998 static_cast<const char*>(data) + data_len);
999 if (written)
1000 *written = data_len;
1001 return SR_SUCCESS;
1002 }
1003
Close()1004 void StringStream::Close() {
1005 }
1006
SetPosition(size_t position)1007 bool StringStream::SetPosition(size_t position) {
1008 if (position > str_.size())
1009 return false;
1010 read_pos_ = position;
1011 return true;
1012 }
1013
GetPosition(size_t * position) const1014 bool StringStream::GetPosition(size_t* position) const {
1015 if (position)
1016 *position = read_pos_;
1017 return true;
1018 }
1019
GetSize(size_t * size) const1020 bool StringStream::GetSize(size_t* size) const {
1021 if (size)
1022 *size = str_.size();
1023 return true;
1024 }
1025
GetAvailable(size_t * size) const1026 bool StringStream::GetAvailable(size_t* size) const {
1027 if (size)
1028 *size = str_.size() - read_pos_;
1029 return true;
1030 }
1031
ReserveSize(size_t size)1032 bool StringStream::ReserveSize(size_t size) {
1033 if (read_only_)
1034 return false;
1035 str_.reserve(size);
1036 return true;
1037 }
1038
1039 ///////////////////////////////////////////////////////////////////////////////
1040 // StreamReference
1041 ///////////////////////////////////////////////////////////////////////////////
1042
StreamReference(StreamInterface * stream)1043 StreamReference::StreamReference(StreamInterface* stream)
1044 : StreamAdapterInterface(stream, false) {
1045 // owner set to false so the destructor does not free the stream.
1046 stream_ref_count_ = new StreamRefCount(stream);
1047 }
1048
NewReference()1049 StreamInterface* StreamReference::NewReference() {
1050 stream_ref_count_->AddReference();
1051 return new StreamReference(stream_ref_count_, stream());
1052 }
1053
~StreamReference()1054 StreamReference::~StreamReference() {
1055 stream_ref_count_->Release();
1056 }
1057
StreamReference(StreamRefCount * stream_ref_count,StreamInterface * stream)1058 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1059 StreamInterface* stream)
1060 : StreamAdapterInterface(stream, false),
1061 stream_ref_count_(stream_ref_count) {
1062 }
1063
1064 ///////////////////////////////////////////////////////////////////////////////
1065
Flow(StreamInterface * source,char * buffer,size_t buffer_len,StreamInterface * sink,size_t * data_len)1066 StreamResult Flow(StreamInterface* source,
1067 char* buffer, size_t buffer_len,
1068 StreamInterface* sink,
1069 size_t* data_len /* = NULL */) {
1070 ASSERT(buffer_len > 0);
1071
1072 StreamResult result;
1073 size_t count, read_pos, write_pos;
1074 if (data_len) {
1075 read_pos = *data_len;
1076 } else {
1077 read_pos = 0;
1078 }
1079
1080 bool end_of_stream = false;
1081 do {
1082 // Read until buffer is full, end of stream, or error
1083 while (!end_of_stream && (read_pos < buffer_len)) {
1084 result = source->Read(buffer + read_pos, buffer_len - read_pos,
1085 &count, NULL);
1086 if (result == SR_EOS) {
1087 end_of_stream = true;
1088 } else if (result != SR_SUCCESS) {
1089 if (data_len) {
1090 *data_len = read_pos;
1091 }
1092 return result;
1093 } else {
1094 read_pos += count;
1095 }
1096 }
1097
1098 // Write until buffer is empty, or error (including end of stream)
1099 write_pos = 0;
1100 while (write_pos < read_pos) {
1101 result = sink->Write(buffer + write_pos, read_pos - write_pos,
1102 &count, NULL);
1103 if (result != SR_SUCCESS) {
1104 if (data_len) {
1105 *data_len = read_pos - write_pos;
1106 if (write_pos > 0) {
1107 memmove(buffer, buffer + write_pos, *data_len);
1108 }
1109 }
1110 return result;
1111 }
1112 write_pos += count;
1113 }
1114
1115 read_pos = 0;
1116 } while (!end_of_stream);
1117
1118 if (data_len) {
1119 *data_len = 0;
1120 }
1121 return SR_SUCCESS;
1122 }
1123
1124 ///////////////////////////////////////////////////////////////////////////////
1125
1126 } // namespace rtc
1127