• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 //
5 // Protobuf ZeroCopy[Input/Output]Stream implementations capable of using a
6 // net::StreamSocket. Built to work with Protobuf CodedStreams.
7 
8 #ifndef GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
9 #define GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
10 
11 #include "base/basictypes.h"
12 #include "base/callback_forward.h"
13 #include "base/compiler_specific.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/memory/weak_ptr.h"
16 #include "google/protobuf/io/zero_copy_stream.h"
17 #include "google_apis/gcm/base/gcm_export.h"
18 #include "net/base/net_errors.h"
19 
20 namespace net {
21 class DrainableIOBuffer;
22 class IOBuffer;
23 class StreamSocket;
24 }  // namespace net
25 
26 namespace gcm {
27 
28 // A helper class for interacting with a net::StreamSocket that is receiving
29 // protobuf encoded messages. A SocketInputStream does not take ownership of
30 // the socket itself, and it is expected that the life of the input stream
31 // should match the life of the socket itself (while the socket remains
32 // connected). If an error is encounters, the input stream will store the error
33 // in |last_error_|, and GetState() will be set to CLOSED.
34 // Typical usage:
35 // 1. Check the GetState() of the input stream before using it. If CLOSED, the
36 //    input stream must be rebuilt (and the socket likely needs to be
37 //    reconnected as an error was encountered).
38 // 2. If GetState() is EMPTY, call Refresh(..), passing the maximum byte size
39 //    for a message, and wait until completion. It is invalid to attempt to
40 //    Refresh an input stream or read data from the stream while a Refresh is
41 //    pending.
42 // 3. Check GetState() again to ensure the Refresh was successful.
43 // 4. Use a CodedInputStream to read from the ZeroCopyInputStream interface of
44 //    the SocketInputStream. Next(..) will return true until there is no data
45 //    remaining.
46 // 5. Call RebuildBuffer when done reading, to shift any unread data to the
47 //    start of the buffer.
48 // 6. Repeat as necessary.
49 class GCM_EXPORT SocketInputStream
50     : public google::protobuf::io::ZeroCopyInputStream {
51  public:
52   enum State {
53     // No valid data to read. This means the buffer is either empty or all data
54     // in the buffer has already been consumed.
55     EMPTY,
56     // Valid data to read.
57     READY,
58     // In the process of reading new data from the socket.
59     READING,
60     // An permanent error occurred and the stream is now closed.
61     CLOSED,
62   };
63 
64   // |socket| should already be connected.
65   explicit SocketInputStream(net::StreamSocket* socket);
66   virtual ~SocketInputStream();
67 
68   // ZeroCopyInputStream implementation.
69   virtual bool Next(const void** data, int* size) OVERRIDE;
70   virtual void BackUp(int count) OVERRIDE;
71   virtual bool Skip(int count) OVERRIDE;  // Not implemented.
72   virtual int64 ByteCount() const OVERRIDE;
73 
74   // The remaining amount of valid data available to be read.
75   int UnreadByteCount() const;
76 
77   // Reads from the socket, appending a max of |byte_limit| bytes onto the read
78   // buffer. net::ERR_IO_PENDING is returned if the refresh can't complete
79   // synchronously, in which case the callback is invoked upon completion. If
80   // the refresh can complete synchronously, even in case of an error, returns
81   // net::OK without invoking callback.
82   // Note: GetState() (and possibly last_error()) should be checked upon
83   // completion to determine whether the Refresh encountered an error.
84   net::Error Refresh(const base::Closure& callback, int byte_limit);
85 
86   // Rebuilds the buffer state by copying over any unread data to the beginning
87   // of the buffer and resetting the buffer read/write positions.
88   // Note: it is not valid to call Rebuild() if GetState() == CLOSED. The stream
89   // must be recreated from scratch in such a scenario.
90   void RebuildBuffer();
91 
92   // Returns the last fatal error encountered. Only valid if GetState() ==
93   // CLOSED.
94   net::Error last_error() const;
95 
96   // Returns the current state.
97   State GetState() const;
98 
99  private:
100   // Clears the local state.
101   void ResetInternal();
102 
103   // Callback for Socket::Read calls.
104   void RefreshCompletionCallback(const base::Closure& callback, int result);
105 
106   // Permanently closes the stream.
107   void CloseStream(net::Error error, const base::Closure& callback);
108 
109   // Internal net components.
110   net::StreamSocket* const socket_;
111   const scoped_refptr<net::IOBuffer> io_buffer_;
112   // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't
113   // been written to yet by Socket::Read calls.
114   const scoped_refptr<net::DrainableIOBuffer> read_buffer_;
115 
116   // Starting position of the data within |io_buffer_| to consume on subsequent
117   // Next(..) call.  0 <= next_pos_ <= read_buffer_.BytesConsumed()
118   // Note: next_pos == read_buffer_.BytesConsumed() implies GetState() == EMPTY.
119   int next_pos_;
120 
121   // If < net::ERR_IO_PENDING, the last net error received.
122   // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == READING.
123   net::Error last_error_;
124 
125   base::WeakPtrFactory<SocketInputStream> weak_ptr_factory_;
126 
127   DISALLOW_COPY_AND_ASSIGN(SocketInputStream);
128 };
129 
130 // A helper class for writing to a SocketStream with protobuf encoded data.
131 // A SocketOutputStream does not take ownership of the socket itself, and it is
132 // expected that the life of the output stream should match the life of the
133 // socket itself (while the socket remains connected).
134 // Typical usage:
135 // 1. Check the GetState() of the output stream before using it. If CLOSED, the
136 //    output stream must be rebuilt (and the socket likely needs to be
137 //    reconnected, as an error was encountered).
138 // 2. If EMPTY, the output stream can be written via a CodedOutputStream using
139 //    the ZeroCopyOutputStream interface.
140 // 3. Once done writing, GetState() should be READY, so call Flush(..) to write
141 //    the buffer into the StreamSocket. Wait for the callback to be invoked
142 //    (it's invalid to write to an output stream while it's flushing).
143 // 4. Check the GetState() again to ensure the Flush was successful. GetState()
144 //    should be EMPTY again.
145 // 5. Repeat.
146 class GCM_EXPORT SocketOutputStream
147     : public google::protobuf::io::ZeroCopyOutputStream {
148  public:
149   enum State {
150     // No valid data yet.
151     EMPTY,
152     // Ready for flushing (some data is present).
153     READY,
154     // In the process of flushing into the socket.
155     FLUSHING,
156     // A permanent error occurred, and the stream is now closed.
157     CLOSED,
158   };
159 
160   // |socket| should already be connected.
161   explicit SocketOutputStream(net::StreamSocket* socket);
162   virtual ~SocketOutputStream();
163 
164   // ZeroCopyOutputStream implementation.
165   virtual bool Next(void** data, int* size) OVERRIDE;
166   virtual void BackUp(int count) OVERRIDE;
167   virtual int64 ByteCount() const OVERRIDE;
168 
169   // Writes the buffer into the Socket.
170   net::Error Flush(const base::Closure& callback);
171 
172   // Returns the last fatal error encountered. Only valid if GetState() ==
173   // CLOSED.
174   net::Error last_error() const;
175 
176   // Returns the current state.
177   State GetState() const;
178 
179  private:
180   void FlushCompletionCallback(const base::Closure& callback, int result);
181 
182   // Internal net components.
183   net::StreamSocket* const socket_;
184   const scoped_refptr<net::IOBuffer> io_buffer_;
185   // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't
186   // been written to the socket yet.
187   const scoped_refptr<net::DrainableIOBuffer> write_buffer_;
188 
189   // Starting position of the data within |io_buffer_| to consume on subsequent
190   // Next(..) call.  0 <= write_buffer_.BytesConsumed() <= next_pos_
191   // Note: next_pos == 0 implies GetState() == EMPTY.
192   int next_pos_;
193 
194   // If < net::ERR_IO_PENDING, the last net error received.
195   // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == FLUSHING.
196   net::Error last_error_;
197 
198   base::WeakPtrFactory<SocketOutputStream> weak_ptr_factory_;
199 
200   DISALLOW_COPY_AND_ASSIGN(SocketOutputStream);
201 };
202 
203 }  // namespace gcm
204 
205 #endif  // GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
206