• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2004--2010, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #ifndef TALK_BASE_STREAM_H__
29 #define TALK_BASE_STREAM_H__
30 
31 #include "talk/base/basictypes.h"
32 #include "talk/base/criticalsection.h"
33 #include "talk/base/logging.h"
34 #include "talk/base/messagehandler.h"
35 #include "talk/base/scoped_ptr.h"
36 #include "talk/base/sigslot.h"
37 
38 namespace talk_base {
39 
40 ///////////////////////////////////////////////////////////////////////////////
41 // StreamInterface is a generic asynchronous stream interface, supporting read,
42 // write, and close operations, and asynchronous signalling of state changes.
43 // The interface is designed with file, memory, and socket implementations in
44 // mind.  Some implementations offer extended operations, such as seeking.
45 ///////////////////////////////////////////////////////////////////////////////
46 
47 // The following enumerations are declared outside of the StreamInterface
48 // class for brevity in use.
49 
50 // The SS_OPENING state indicates that the stream will signal open or closed
51 // in the future.
52 enum StreamState { SS_CLOSED, SS_OPENING, SS_OPEN };
53 
54 // Stream read/write methods return this value to indicate various success
55 // and failure conditions described below.
56 enum StreamResult { SR_ERROR, SR_SUCCESS, SR_BLOCK, SR_EOS };
57 
58 // StreamEvents are used to asynchronously signal state transitionss.  The flags
59 // may be combined.
60 //  SE_OPEN: The stream has transitioned to the SS_OPEN state
61 //  SE_CLOSE: The stream has transitioned to the SS_CLOSED state
62 //  SE_READ: Data is available, so Read is likely to not return SR_BLOCK
63 //  SE_WRITE: Data can be written, so Write is likely to not return SR_BLOCK
64 enum StreamEvent { SE_OPEN = 1, SE_READ = 2, SE_WRITE = 4, SE_CLOSE = 8 };
65 
66 class Thread;
67 
68 class StreamInterface : public MessageHandler {
69  public:
70   virtual ~StreamInterface();
71 
72   virtual StreamState GetState() const = 0;
73 
74   // Read attempts to fill buffer of size buffer_len.  Write attempts to send
75   // data_len bytes stored in data.  The variables read and write are set only
76   // on SR_SUCCESS (see below).  Likewise, error is only set on SR_ERROR.
77   // Read and Write return a value indicating:
78   //  SR_ERROR: an error occurred, which is returned in a non-null error
79   //    argument.  Interpretation of the error requires knowledge of the
80   //    stream's concrete type, which limits its usefulness.
81   //  SR_SUCCESS: some number of bytes were successfully written, which is
82   //    returned in a non-null read/write argument.
83   //  SR_BLOCK: the stream is in non-blocking mode, and the operation would
84   //    block, or the stream is in SS_OPENING state.
85   //  SR_EOS: the end-of-stream has been reached, or the stream is in the
86   //    SS_CLOSED state.
87   virtual StreamResult Read(void* buffer, size_t buffer_len,
88                             size_t* read, int* error) = 0;
89   virtual StreamResult Write(const void* data, size_t data_len,
90                              size_t* written, int* error) = 0;
91   // Attempt to transition to the SS_CLOSED state.  SE_CLOSE will not be
92   // signalled as a result of this call.
93   virtual void Close() = 0;
94 
95   // Streams may signal one or more StreamEvents to indicate state changes.
96   // The first argument identifies the stream on which the state change occured.
97   // The second argument is a bit-wise combination of StreamEvents.
98   // If SE_CLOSE is signalled, then the third argument is the associated error
99   // code.  Otherwise, the value is undefined.
100   // Note: Not all streams will support asynchronous event signalling.  However,
101   // SS_OPENING and SR_BLOCK returned from stream member functions imply that
102   // certain events will be raised in the future.
103   sigslot::signal3<StreamInterface*, int, int> SignalEvent;
104 
105   // Like calling SignalEvent, but posts a message to the specified thread,
106   // which will call SignalEvent.  This helps unroll the stack and prevent
107   // re-entrancy.
108   void PostEvent(Thread* t, int events, int err);
109   // Like the aforementioned method, but posts to the current thread.
110   void PostEvent(int events, int err);
111 
112   //
113   // OPTIONAL OPERATIONS
114   //
115   // Not all implementations will support the following operations.  In general,
116   // a stream will only support an operation if it reasonably efficient to do
117   // so.  For example, while a socket could buffer incoming data to support
118   // seeking, it will not do so.  Instead, a buffering stream adapter should
119   // be used.
120   //
121   // Even though several of these operations are related, you should
122   // always use whichever operation is most relevant.  For example, you may
123   // be tempted to use GetSize() and GetPosition() to deduce the result of
124   // GetAvailable().  However, a stream which is read-once may support the
125   // latter operation but not the former.
126   //
127 
128   // The following four methods are used to avoid coping data multiple times.
129 
130   // GetReadData returns a pointer to a buffer which is owned by the stream.
131   // The buffer contains data_len bytes.  NULL is returned if no data is
132   // available, or if the method fails.  If the caller processes the data, it
133   // must call ConsumeReadData with the number of processed bytes.  GetReadData
134   // does not require a matching call to ConsumeReadData if the data is not
135   // processed.  Read and ConsumeReadData invalidate the buffer returned by
136   // GetReadData.
GetReadData(size_t * data_len)137   virtual const void* GetReadData(size_t* data_len) { return NULL; }
ConsumeReadData(size_t used)138   virtual void ConsumeReadData(size_t used) {}
139 
140   // GetWriteBuffer returns a pointer to a buffer which is owned by the stream.
141   // The buffer has a capacity of buf_len bytes.  NULL is returned if there is
142   // no buffer available, or if the method fails.  The call may write data to
143   // the buffer, and then call ConsumeWriteBuffer with the number of bytes
144   // written.  GetWriteBuffer does not require a matching call to
145   // ConsumeWriteData if no data is written.  Write, ForceWrite, and
146   // ConsumeWriteData invalidate the buffer returned by GetWriteBuffer.
147   // TODO: Allow the caller to specify a minimum buffer size.  If the specified
148   // amount of buffer is not yet available, return NULL and Signal SE_WRITE
149   // when it is available.  If the requested amount is too large, return an
150   // error.
GetWriteBuffer(size_t * buf_len)151   virtual void* GetWriteBuffer(size_t* buf_len) { return NULL; }
ConsumeWriteBuffer(size_t used)152   virtual void ConsumeWriteBuffer(size_t used) {}
153 
154   // Write data_len bytes found in data, circumventing any throttling which
155   // would could cause SR_BLOCK to be returned.  Returns true if all the data
156   // was written.  Otherwise, the method is unsupported, or an unrecoverable
157   // error occurred, and the error value is set.  This method should be used
158   // sparingly to write critical data which should not be throttled.  A stream
159   // which cannot circumvent its blocking constraints should not implement this
160   // method.
161   // NOTE: This interface is being considered experimentally at the moment.  It
162   // would be used by JUDP and BandwidthStream as a way to circumvent certain
163   // soft limits in writing.
164   //virtual bool ForceWrite(const void* data, size_t data_len, int* error) {
165   //  if (error) *error = -1;
166   //  return false;
167   //}
168 
169   // Seek to a byte offset from the beginning of the stream.  Returns false if
170   // the stream does not support seeking, or cannot seek to the specified
171   // position.
SetPosition(size_t position)172   virtual bool SetPosition(size_t position) { return false; }
173 
174   // Get the byte offset of the current position from the start of the stream.
175   // Returns false if the position is not known.
GetPosition(size_t * position)176   virtual bool GetPosition(size_t* position) const { return false; }
177 
178   // Get the byte length of the entire stream.  Returns false if the length
179   // is not known.
GetSize(size_t * size)180   virtual bool GetSize(size_t* size) const { return false; }
181 
182   // Return the number of Read()-able bytes remaining before end-of-stream.
183   // Returns false if not known.
GetAvailable(size_t * size)184   virtual bool GetAvailable(size_t* size) const { return false; }
185 
186   // Return the number of Write()-able bytes remaining before end-of-stream.
187   // Returns false if not known.
GetWriteRemaining(size_t * size)188   virtual bool GetWriteRemaining(size_t* size) const { return false; }
189 
190   // Communicates the amount of data which will be written to the stream.  The
191   // stream may choose to preallocate memory to accomodate this data.  The
192   // stream may return false to indicate that there is not enough room (ie,
193   // Write will return SR_EOS/SR_ERROR at some point).  Note that calling this
194   // function should not affect the existing state of data in the stream.
ReserveSize(size_t size)195   virtual bool ReserveSize(size_t size) { return true; }
196 
197   //
198   // CONVENIENCE METHODS
199   //
200   // These methods are implemented in terms of other methods, for convenience.
201   //
202 
203   // Seek to the start of the stream.
Rewind()204   inline bool Rewind() { return SetPosition(0); }
205 
206   // WriteAll is a helper function which repeatedly calls Write until all the
207   // data is written, or something other than SR_SUCCESS is returned.  Note that
208   // unlike Write, the argument 'written' is always set, and may be non-zero
209   // on results other than SR_SUCCESS.  The remaining arguments have the
210   // same semantics as Write.
211   StreamResult WriteAll(const void* data, size_t data_len,
212                         size_t* written, int* error);
213 
214   // Similar to ReadAll.  Calls Read until buffer_len bytes have been read, or
215   // until a non-SR_SUCCESS result is returned.  'read' is always set.
216   StreamResult ReadAll(void* buffer, size_t buffer_len,
217                        size_t* read, int* error);
218 
219   // ReadLine is a helper function which repeatedly calls Read until it hits
220   // the end-of-line character, or something other than SR_SUCCESS.
221   // TODO: this is too inefficient to keep here.  Break this out into a buffered
222   // readline object or adapter
223   StreamResult ReadLine(std::string *line);
224 
225  protected:
226   StreamInterface();
227 
228   // MessageHandler Interface
229   virtual void OnMessage(Message* msg);
230 
231  private:
232   DISALLOW_EVIL_CONSTRUCTORS(StreamInterface);
233 };
234 
235 ///////////////////////////////////////////////////////////////////////////////
236 // StreamAdapterInterface is a convenient base-class for adapting a stream.
237 // By default, all operations are pass-through.  Override the methods that you
238 // require adaptation.  Streams should really be upgraded to reference-counted.
239 // In the meantime, use the owned flag to indicate whether the adapter should
240 // own the adapted stream.
241 ///////////////////////////////////////////////////////////////////////////////
242 
243 class StreamAdapterInterface : public StreamInterface,
244                                public sigslot::has_slots<> {
245  public:
246   explicit StreamAdapterInterface(StreamInterface* stream, bool owned = true);
247 
248   // Core Stream Interface
GetState()249   virtual StreamState GetState() const {
250     return stream_->GetState();
251   }
Read(void * buffer,size_t buffer_len,size_t * read,int * error)252   virtual StreamResult Read(void* buffer, size_t buffer_len,
253                             size_t* read, int* error) {
254     return stream_->Read(buffer, buffer_len, read, error);
255   }
Write(const void * data,size_t data_len,size_t * written,int * error)256   virtual StreamResult Write(const void* data, size_t data_len,
257                              size_t* written, int* error) {
258     return stream_->Write(data, data_len, written, error);
259   }
Close()260   virtual void Close() {
261     stream_->Close();
262   }
263 
264   // Optional Stream Interface
265   /*  Note: Many stream adapters were implemented prior to this Read/Write
266       interface.  Therefore, a simple pass through of data in those cases may
267       be broken.  At a later time, we should do a once-over pass of all
268       adapters, and make them compliant with these interfaces, after which this
269       code can be uncommented.
270   virtual const void* GetReadData(size_t* data_len) {
271     return stream_->GetReadData(data_len);
272   }
273   virtual void ConsumeReadData(size_t used) {
274     stream_->ConsumeReadData(used);
275   }
276 
277   virtual void* GetWriteBuffer(size_t* buf_len) {
278     return stream_->GetWriteBuffer(buf_len);
279   }
280   virtual void ConsumeWriteBuffer(size_t used) {
281     stream_->ConsumeWriteBuffer(used);
282   }
283   */
284 
285   /*  Note: This interface is currently undergoing evaluation.
286   virtual bool ForceWrite(const void* data, size_t data_len, int* error) {
287     return stream_->ForceWrite(data, data_len, error);
288   }
289   */
290 
SetPosition(size_t position)291   virtual bool SetPosition(size_t position) {
292     return stream_->SetPosition(position);
293   }
GetPosition(size_t * position)294   virtual bool GetPosition(size_t* position) const {
295     return stream_->GetPosition(position);
296   }
GetSize(size_t * size)297   virtual bool GetSize(size_t* size) const {
298     return stream_->GetSize(size);
299   }
GetAvailable(size_t * size)300   virtual bool GetAvailable(size_t* size) const {
301     return stream_->GetAvailable(size);
302   }
GetWriteRemaining(size_t * size)303   virtual bool GetWriteRemaining(size_t* size) const {
304     return stream_->GetWriteRemaining(size);
305   }
ReserveSize(size_t size)306   virtual bool ReserveSize(size_t size) {
307     return stream_->ReserveSize(size);
308   }
309 
310   void Attach(StreamInterface* stream, bool owned = true);
311   StreamInterface* Detach();
312 
313  protected:
314   virtual ~StreamAdapterInterface();
315 
316   // Note that the adapter presents itself as the origin of the stream events,
317   // since users of the adapter may not recognize the adapted object.
OnEvent(StreamInterface * stream,int events,int err)318   virtual void OnEvent(StreamInterface* stream, int events, int err) {
319     SignalEvent(this, events, err);
320   }
stream()321   StreamInterface* stream() { return stream_; }
322 
323  private:
324   StreamInterface* stream_;
325   bool owned_;
326   DISALLOW_EVIL_CONSTRUCTORS(StreamAdapterInterface);
327 };
328 
329 ///////////////////////////////////////////////////////////////////////////////
330 // StreamTap is a non-modifying, pass-through adapter, which copies all data
331 // in either direction to the tap.  Note that errors or blocking on writing to
332 // the tap will prevent further tap writes from occurring.
333 ///////////////////////////////////////////////////////////////////////////////
334 
335 class StreamTap : public StreamAdapterInterface {
336  public:
337   explicit StreamTap(StreamInterface* stream, StreamInterface* tap);
338 
339   void AttachTap(StreamInterface* tap);
340   StreamInterface* DetachTap();
341   StreamResult GetTapResult(int* error);
342 
343   // StreamAdapterInterface Interface
344   virtual StreamResult Read(void* buffer, size_t buffer_len,
345                             size_t* read, int* error);
346   virtual StreamResult Write(const void* data, size_t data_len,
347                              size_t* written, int* error);
348 
349  private:
350   scoped_ptr<StreamInterface> tap_;
351   StreamResult tap_result_;
352   int tap_error_;
353   DISALLOW_EVIL_CONSTRUCTORS(StreamTap);
354 };
355 
356 ///////////////////////////////////////////////////////////////////////////////
357 // StreamSegment adapts a read stream, to expose a subset of the adapted
358 // stream's data.  This is useful for cases where a stream contains multiple
359 // documents concatenated together.  StreamSegment can expose a subset of
360 // the data as an independent stream, including support for rewinding and
361 // seeking.
362 ///////////////////////////////////////////////////////////////////////////////
363 
364 class StreamSegment : public StreamAdapterInterface {
365  public:
366   // The current position of the adapted stream becomes the beginning of the
367   // segment.  If a length is specified, it bounds the length of the segment.
368   explicit StreamSegment(StreamInterface* stream);
369   explicit StreamSegment(StreamInterface* stream, size_t length);
370 
371   // StreamAdapterInterface Interface
372   virtual StreamResult Read(void* buffer, size_t buffer_len,
373                             size_t* read, int* error);
374   virtual bool SetPosition(size_t position);
375   virtual bool GetPosition(size_t* position) const;
376   virtual bool GetSize(size_t* size) const;
377   virtual bool GetAvailable(size_t* size) const;
378 
379  private:
380   size_t start_, pos_, length_;
381   DISALLOW_EVIL_CONSTRUCTORS(StreamSegment);
382 };
383 
384 ///////////////////////////////////////////////////////////////////////////////
385 // NullStream gives errors on read, and silently discards all written data.
386 ///////////////////////////////////////////////////////////////////////////////
387 
388 class NullStream : public StreamInterface {
389  public:
390   NullStream();
391   virtual ~NullStream();
392 
393   // StreamInterface Interface
394   virtual StreamState GetState() const;
395   virtual StreamResult Read(void* buffer, size_t buffer_len,
396                             size_t* read, int* error);
397   virtual StreamResult Write(const void* data, size_t data_len,
398                              size_t* written, int* error);
399   virtual void Close();
400 };
401 
402 ///////////////////////////////////////////////////////////////////////////////
403 // FileStream is a simple implementation of a StreamInterface, which does not
404 // support asynchronous notification.
405 ///////////////////////////////////////////////////////////////////////////////
406 
407 class FileStream : public StreamInterface {
408  public:
409   FileStream();
410   virtual ~FileStream();
411 
412   // The semantics of filename and mode are the same as stdio's fopen
413   virtual bool Open(const std::string& filename, const char* mode);
414   virtual bool OpenShare(const std::string& filename, const char* mode,
415                          int shflag);
416 
417   // By default, reads and writes are buffered for efficiency.  Disabling
418   // buffering causes writes to block until the bytes on disk are updated.
419   virtual bool DisableBuffering();
420 
421   virtual StreamState GetState() const;
422   virtual StreamResult Read(void* buffer, size_t buffer_len,
423                             size_t* read, int* error);
424   virtual StreamResult Write(const void* data, size_t data_len,
425                              size_t* written, int* error);
426   virtual void Close();
427   virtual bool SetPosition(size_t position);
428   virtual bool GetPosition(size_t* position) const;
429   virtual bool GetSize(size_t* size) const;
430   virtual bool GetAvailable(size_t* size) const;
431   virtual bool ReserveSize(size_t size);
432 
433   bool Flush();
434 
435 #if defined(POSIX)
436   // Tries to aquire an exclusive lock on the file.
437   // Use OpenShare(...) on win32 to get similar functionality.
438   bool TryLock();
439   bool Unlock();
440 #endif
441 
442   // Note: Deprecated in favor of Filesystem::GetFileSize().
443   static bool GetSize(const std::string& filename, size_t* size);
444 
445  protected:
446   virtual void DoClose();
447 
448   FILE* file_;
449 
450  private:
451   DISALLOW_EVIL_CONSTRUCTORS(FileStream);
452 };
453 
454 #ifdef POSIX
455 // A FileStream that is actually not a file, but the output or input of a
456 // sub-command. See "man 3 popen" for documentation of the underlying OS popen()
457 // function.
458 class POpenStream : public FileStream {
459  public:
POpenStream()460   POpenStream() : wait_status_(-1) {}
461   virtual ~POpenStream();
462 
463   virtual bool Open(const std::string& subcommand, const char* mode);
464   // Same as Open(). shflag is ignored.
465   virtual bool OpenShare(const std::string& subcommand, const char* mode,
466                          int shflag);
467 
468   // Returns the wait status from the last Close() of an Open()'ed stream, or
469   // -1 if no Open()+Close() has been done on this object. Meaning of the number
470   // is documented in "man 2 wait".
GetWaitStatus()471   int GetWaitStatus() const { return wait_status_; }
472 
473  protected:
474   virtual void DoClose();
475 
476  private:
477   int wait_status_;
478 };
479 #endif  // POSIX
480 
481 ///////////////////////////////////////////////////////////////////////////////
482 // MemoryStream is a simple implementation of a StreamInterface over in-memory
483 // data.  Data is read and written at the current seek position.  Reads return
484 // end-of-stream when they reach the end of data.  Writes actually extend the
485 // end of data mark.
486 ///////////////////////////////////////////////////////////////////////////////
487 
488 class MemoryStreamBase : public StreamInterface {
489  public:
490   virtual StreamState GetState() const;
491   virtual StreamResult Read(void* buffer, size_t bytes, size_t* bytes_read,
492                             int* error);
493   virtual StreamResult Write(const void* buffer, size_t bytes,
494                              size_t* bytes_written, int* error);
495   virtual void Close();
496   virtual bool SetPosition(size_t position);
497   virtual bool GetPosition(size_t* position) const;
498   virtual bool GetSize(size_t* size) const;
499   virtual bool GetAvailable(size_t* size) const;
500   virtual bool ReserveSize(size_t size);
501 
GetBuffer()502   char* GetBuffer() { return buffer_; }
GetBuffer()503   const char* GetBuffer() const { return buffer_; }
504 
505  protected:
506   MemoryStreamBase();
507 
508   virtual StreamResult DoReserve(size_t size, int* error);
509 
510   // Invariant: 0 <= seek_position <= data_length_ <= buffer_length_
511   char* buffer_;
512   size_t buffer_length_;
513   size_t data_length_;
514   size_t seek_position_;
515 
516  private:
517   DISALLOW_EVIL_CONSTRUCTORS(MemoryStreamBase);
518 };
519 
520 // MemoryStream dynamically resizes to accomodate written data.
521 
522 class MemoryStream : public MemoryStreamBase {
523  public:
524   MemoryStream();
525   explicit MemoryStream(const char* data);  // Calls SetData(data, strlen(data))
526   MemoryStream(const void* data, size_t length);  // Calls SetData(data, length)
527   virtual ~MemoryStream();
528 
529   void SetData(const void* data, size_t length);
530 
531  protected:
532   virtual StreamResult DoReserve(size_t size, int* error);
533   // Memory Streams are aligned for efficiency.
534   static const int kAlignment = 16;
535   char* buffer_alloc_;
536 };
537 
538 // ExternalMemoryStream adapts an external memory buffer, so writes which would
539 // extend past the end of the buffer will return end-of-stream.
540 
541 class ExternalMemoryStream : public MemoryStreamBase {
542  public:
543   ExternalMemoryStream();
544   ExternalMemoryStream(void* data, size_t length);
545   virtual ~ExternalMemoryStream();
546 
547   void SetData(void* data, size_t length);
548 };
549 
550 // FifoBuffer allows for efficient, thread-safe buffering of data between
551 // writer and reader. As the data can wrap around the end of the buffer,
552 // MemoryStreamBase can't help us here.
553 
554 class FifoBuffer : public StreamInterface {
555  public:
556   // Creates a FIFO buffer with the specified capacity.
557   explicit FifoBuffer(size_t length);
558   virtual ~FifoBuffer();
559   // Gets the amount of data currently readable from the buffer.
560   bool GetBuffered(size_t* data_len) const;
561   // Resizes the buffer to the specified capacity. Fails if data_length_ > size
562   bool SetCapacity(size_t length);
563 
564   // StreamInterface methods
565   virtual StreamState GetState() const;
566   virtual StreamResult Read(void* buffer, size_t bytes,
567                             size_t* bytes_read, int* error);
568   virtual StreamResult Write(const void* buffer, size_t bytes,
569                              size_t* bytes_written, int* error);
570   virtual void Close();
571   virtual const void* GetReadData(size_t* data_len);
572   virtual void ConsumeReadData(size_t used);
573   virtual void* GetWriteBuffer(size_t *buf_len);
574   virtual void ConsumeWriteBuffer(size_t used);
575 
576  private:
577   StreamState state_;  // keeps the opened/closed state of the stream
578   scoped_array<char> buffer_;  // the allocated buffer
579   size_t buffer_length_;  // size of the allocated buffer
580   size_t data_length_;  // amount of readable data in the buffer
581   size_t read_position_;  // offset to the readable data
582   Thread* owner_;  // stream callbacks are dispatched on this thread
583   mutable CriticalSection crit_;  // object lock
584   DISALLOW_EVIL_CONSTRUCTORS(FifoBuffer);
585 };
586 
587 ///////////////////////////////////////////////////////////////////////////////
588 
589 class LoggingAdapter : public StreamAdapterInterface {
590  public:
591   LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
592                  const std::string& label, bool hex_mode = false);
593 
594   void set_label(const std::string& label);
595 
596   virtual StreamResult Read(void* buffer, size_t buffer_len,
597                             size_t* read, int* error);
598   virtual StreamResult Write(const void* data, size_t data_len,
599                              size_t* written, int* error);
600   virtual void Close();
601 
602  protected:
603   virtual void OnEvent(StreamInterface* stream, int events, int err);
604 
605  private:
606   LoggingSeverity level_;
607   std::string label_;
608   bool hex_mode_;
609   LogMultilineState lms_;
610 
611   DISALLOW_EVIL_CONSTRUCTORS(LoggingAdapter);
612 };
613 
614 ///////////////////////////////////////////////////////////////////////////////
615 // StringStream - Reads/Writes to an external std::string
616 ///////////////////////////////////////////////////////////////////////////////
617 
618 class StringStream : public StreamInterface {
619  public:
620   explicit StringStream(std::string& str);
621   explicit StringStream(const std::string& str);
622 
623   virtual StreamState GetState() const;
624   virtual StreamResult Read(void* buffer, size_t buffer_len,
625                             size_t* read, int* error);
626   virtual StreamResult Write(const void* data, size_t data_len,
627                              size_t* written, int* error);
628   virtual void Close();
629   virtual bool SetPosition(size_t position);
630   virtual bool GetPosition(size_t* position) const;
631   virtual bool GetSize(size_t* size) const;
632   virtual bool GetAvailable(size_t* size) const;
633   virtual bool ReserveSize(size_t size);
634 
635  private:
636   std::string& str_;
637   size_t read_pos_;
638   bool read_only_;
639 };
640 
641 ///////////////////////////////////////////////////////////////////////////////
642 // StreamReference - A reference counting stream adapter
643 ///////////////////////////////////////////////////////////////////////////////
644 
645 // Keep in mind that the streams and adapters defined in this file are
646 // not thread-safe, so this has limited uses.
647 
648 // A StreamRefCount holds the reference count and a pointer to the
649 // wrapped stream. It deletes the wrapped stream when there are no
650 // more references. We can then have multiple StreamReference
651 // instances pointing to one StreamRefCount, all wrapping the same
652 // stream.
653 
654 class StreamReference : public StreamAdapterInterface {
655   class StreamRefCount;
656  public:
657   // Constructor for the first reference to a stream
658   // Note: get more references through NewReference(). Use this
659   // constructor only once on a given stream.
660   explicit StreamReference(StreamInterface* stream);
GetStream()661   StreamInterface* GetStream() { return stream(); }
662   StreamInterface* NewReference();
663   virtual ~StreamReference();
664 
665  private:
666   class StreamRefCount {
667    public:
StreamRefCount(StreamInterface * stream)668     explicit StreamRefCount(StreamInterface* stream)
669         : stream_(stream), ref_count_(1) {
670     }
AddReference()671     void AddReference() {
672       CritScope lock(&cs_);
673       ++ref_count_;
674     }
Release()675     void Release() {
676       int ref_count;
677       {  // Atomic ops would have been a better fit here.
678         CritScope lock(&cs_);
679         ref_count = --ref_count_;
680       }
681       if (ref_count == 0) {
682         delete stream_;
683         delete this;
684       }
685     }
686    private:
687     StreamInterface* stream_;
688     int ref_count_;
689     CriticalSection cs_;
690     DISALLOW_EVIL_CONSTRUCTORS(StreamRefCount);
691   };
692 
693   // Constructor for adding references
694   explicit StreamReference(StreamRefCount* stream_ref_count,
695                            StreamInterface* stream);
696 
697   StreamRefCount* stream_ref_count_;
698   DISALLOW_EVIL_CONSTRUCTORS(StreamReference);
699 };
700 
701 ///////////////////////////////////////////////////////////////////////////////
702 
703 // Flow attempts to move bytes from source to sink via buffer of size
704 // buffer_len.  The function returns SR_SUCCESS when source reaches
705 // end-of-stream (returns SR_EOS), and all the data has been written successful
706 // to sink.  Alternately, if source returns SR_BLOCK or SR_ERROR, or if sink
707 // returns SR_BLOCK, SR_ERROR, or SR_EOS, then the function immediately returns
708 // with the unexpected StreamResult value.
709 // data_len is the length of the valid data in buffer. in case of error
710 // this is the data that read from source but can't move to destination.
711 // as a pass in parameter, it indicates data in buffer that should move to sink
712 StreamResult Flow(StreamInterface* source,
713                   char* buffer, size_t buffer_len,
714                   StreamInterface* sink, size_t* data_len = NULL);
715 
716 ///////////////////////////////////////////////////////////////////////////////
717 
718 }  // namespace talk_base
719 
720 #endif  // TALK_BASE_STREAM_H__
721