• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "device/serial/data_receiver.h"
6 
7 #include <limits>
8 
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
11 #include "device/serial/async_waiter.h"
12 
13 namespace device {
14 
15 // Represents a receive that is not yet fulfilled.
16 class DataReceiver::PendingReceive {
17  public:
18   PendingReceive(DataReceiver* receiver,
19                  const ReceiveDataCallback& callback,
20                  const ReceiveErrorCallback& error_callback,
21                  int32_t fatal_error_value);
22 
23   // Dispatches |data| to |receive_callback_|.
24   void DispatchData(const void* data, uint32_t num_bytes);
25 
26   // Reports |error| to |receive_error_callback_| if it is an appropriate time.
27   // Returns whether it dispatched |error|.
28   bool DispatchError(DataReceiver::PendingError* error,
29                      uint32_t bytes_received);
30 
31   // Reports |fatal_error_value_| to |receive_error_callback_|.
32   void DispatchFatalError();
33 
34  private:
35   class Buffer;
36 
37   // Invoked when the user is finished with the ReadOnlyBuffer provided to
38   // |receive_callback_|.
39   void Done(uint32_t num_bytes);
40 
41   // The DataReceiver that owns this.
42   DataReceiver* receiver_;
43 
44   // The callback to dispatch data.
45   ReceiveDataCallback receive_callback_;
46 
47   // The callback to report errors.
48   ReceiveErrorCallback receive_error_callback_;
49 
50   // The error value to report when DispatchFatalError() is called.
51   const int32_t fatal_error_value_;
52 
53   // True if the user owns a buffer passed to |receive_callback_| as part of
54   // DispatchData().
55   bool buffer_in_use_;
56 };
57 
58 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
59 // a DataReceiver.
60 class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
61  public:
62   Buffer(scoped_refptr<DataReceiver> pipe,
63          PendingReceive* receive,
64          const char* buffer,
65          uint32_t buffer_size);
66   virtual ~Buffer();
67 
68   // ReadOnlyBuffer overrides.
69   virtual const char* GetData() OVERRIDE;
70   virtual uint32_t GetSize() OVERRIDE;
71   virtual void Done(uint32_t bytes_consumed) OVERRIDE;
72   virtual void DoneWithError(uint32_t bytes_consumed, int32_t error) OVERRIDE;
73 
74  private:
75   // The DataReceiver whose data pipe we are providing a view.
76   scoped_refptr<DataReceiver> receiver_;
77 
78   // The PendingReceive to which this buffer has been created in response.
79   PendingReceive* pending_receive_;
80 
81   const char* buffer_;
82   uint32_t buffer_size_;
83 };
84 
85 // Represents an error received from the DataSource.
86 struct DataReceiver::PendingError {
PendingErrordevice::DataReceiver::PendingError87   PendingError(uint32_t offset, int32_t error)
88       : offset(offset), error(error), dispatched(false) {}
89 
90   // The location within the data stream where the error occurred.
91   const uint32_t offset;
92 
93   // The value of the error that occurred.
94   const int32_t error;
95 
96   // Whether the error has been dispatched to the user.
97   bool dispatched;
98 };
99 
DataReceiver(mojo::InterfacePtr<serial::DataSource> source,uint32_t buffer_size,int32_t fatal_error_value)100 DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
101                            uint32_t buffer_size,
102                            int32_t fatal_error_value)
103     : source_(source.Pass()),
104       fatal_error_value_(fatal_error_value),
105       bytes_received_(0),
106       shut_down_(false),
107       weak_factory_(this) {
108   MojoCreateDataPipeOptions options = {
109       sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
110   };
111   mojo::ScopedDataPipeProducerHandle remote_handle;
112   MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
113   DCHECK_EQ(MOJO_RESULT_OK, result);
114   source_->Init(remote_handle.Pass());
115   source_.set_client(this);
116 }
117 
Receive(const ReceiveDataCallback & callback,const ReceiveErrorCallback & error_callback)118 bool DataReceiver::Receive(const ReceiveDataCallback& callback,
119                            const ReceiveErrorCallback& error_callback) {
120   DCHECK(!callback.is_null() && !error_callback.is_null());
121   if (pending_receive_ || shut_down_)
122     return false;
123   // When the DataSource encounters an error, it pauses transmission. When the
124   // user starts a new receive following notification of the error (via
125   // |error_callback| of the previous Receive call) of the error we can tell the
126   // DataSource to resume transmission of data.
127   if (pending_error_ && pending_error_->dispatched) {
128     source_->Resume();
129     pending_error_.reset();
130   }
131 
132   pending_receive_.reset(
133       new PendingReceive(this, callback, error_callback, fatal_error_value_));
134   base::MessageLoop::current()->PostTask(
135       FROM_HERE,
136       base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr()));
137   return true;
138 }
139 
~DataReceiver()140 DataReceiver::~DataReceiver() {
141   ShutDown();
142 }
143 
OnError(uint32_t offset,int32_t error)144 void DataReceiver::OnError(uint32_t offset, int32_t error) {
145   if (shut_down_)
146     return;
147 
148   if (pending_error_) {
149     // When OnError is called by the DataSource, transmission of data is
150     // suspended. Thus we shouldn't receive another call to OnError until we
151     // have fully dealt with the error and called Resume to resume transmission
152     // (see Receive()). Under normal operation we should never get here, but if
153     // we do (e.g. in the case of a hijacked service process) just shut down.
154     ShutDown();
155     return;
156   }
157   pending_error_.reset(new PendingError(offset, error));
158   if (pending_receive_ &&
159       pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
160     pending_receive_.reset();
161     waiter_.reset();
162   }
163 }
164 
OnConnectionError()165 void DataReceiver::OnConnectionError() {
166   ShutDown();
167 }
168 
Done(uint32_t bytes_consumed)169 void DataReceiver::Done(uint32_t bytes_consumed) {
170   if (shut_down_)
171     return;
172 
173   DCHECK(pending_receive_);
174   MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed);
175   DCHECK_EQ(MOJO_RESULT_OK, result);
176   pending_receive_.reset();
177   bytes_received_ += bytes_consumed;
178 }
179 
OnDoneWaiting(MojoResult result)180 void DataReceiver::OnDoneWaiting(MojoResult result) {
181   DCHECK(pending_receive_ && !shut_down_ && waiter_);
182   waiter_.reset();
183   if (result != MOJO_RESULT_OK) {
184     ShutDown();
185     return;
186   }
187   ReceiveInternal();
188 }
189 
ReceiveInternal()190 void DataReceiver::ReceiveInternal() {
191   if (shut_down_)
192     return;
193   DCHECK(pending_receive_);
194   if (pending_error_ &&
195       pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
196     pending_receive_.reset();
197     waiter_.reset();
198     return;
199   }
200 
201   const void* data;
202   uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
203   MojoResult result = mojo::BeginReadDataRaw(
204       handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
205   if (result == MOJO_RESULT_OK) {
206     if (!CheckErrorNotInReadRange(num_bytes)) {
207       ShutDown();
208       return;
209     }
210 
211     pending_receive_->DispatchData(data, num_bytes);
212     return;
213   }
214   if (result == MOJO_RESULT_SHOULD_WAIT) {
215     waiter_.reset(new AsyncWaiter(
216         handle_.get(),
217         MOJO_HANDLE_SIGNAL_READABLE,
218         base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr())));
219     return;
220   }
221   ShutDown();
222 }
223 
CheckErrorNotInReadRange(uint32_t num_bytes)224 bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes) {
225   DCHECK(pending_receive_);
226   if (!pending_error_)
227     return true;
228 
229   DCHECK_NE(bytes_received_, pending_error_->offset);
230   DCHECK_NE(num_bytes, 0u);
231   uint32_t potential_bytes_received = bytes_received_ + num_bytes;
232   // bytes_received_ can overflow so we must consider two cases:
233   //   1. Both |bytes_received_| and |pending_error_->offset| have overflowed an
234   //      equal number of times. In this case, |potential_bytes_received| must
235   //      be in the range (|bytes_received|, |pending_error_->offset|]. Below
236   //      this range can only occur if |bytes_received_| overflows before
237   //      |pending_error_->offset|. Above can only occur if |bytes_received_|
238   //      overtakes |pending_error_->offset|.
239   //   2. |pending_error_->offset| has overflowed once more than
240   //      |bytes_received_|. In this case, |potential_bytes_received| must not
241   //      be in the range (|pending_error_->offset|, |bytes_received_|].
242   if ((bytes_received_ < pending_error_->offset &&
243        (potential_bytes_received > pending_error_->offset ||
244         potential_bytes_received <= bytes_received_)) ||
245       (bytes_received_ > pending_error_->offset &&
246        potential_bytes_received > pending_error_->offset &&
247        potential_bytes_received <= bytes_received_)) {
248     return false;
249   }
250   return true;
251 }
252 
ShutDown()253 void DataReceiver::ShutDown() {
254   shut_down_ = true;
255   if (pending_receive_)
256     pending_receive_->DispatchFatalError();
257   pending_error_.reset();
258   waiter_.reset();
259 }
260 
PendingReceive(DataReceiver * receiver,const ReceiveDataCallback & callback,const ReceiveErrorCallback & error_callback,int32_t fatal_error_value)261 DataReceiver::PendingReceive::PendingReceive(
262     DataReceiver* receiver,
263     const ReceiveDataCallback& callback,
264     const ReceiveErrorCallback& error_callback,
265     int32_t fatal_error_value)
266     : receiver_(receiver),
267       receive_callback_(callback),
268       receive_error_callback_(error_callback),
269       fatal_error_value_(fatal_error_value),
270       buffer_in_use_(false) {
271 }
272 
DispatchData(const void * data,uint32_t num_bytes)273 void DataReceiver::PendingReceive::DispatchData(const void* data,
274                                                 uint32_t num_bytes) {
275   DCHECK(!buffer_in_use_);
276   buffer_in_use_ = true;
277   receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>(
278       new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes)));
279 }
280 
DispatchError(PendingError * error,uint32_t bytes_received)281 bool DataReceiver::PendingReceive::DispatchError(PendingError* error,
282                                                  uint32_t bytes_received) {
283   DCHECK(!error->dispatched);
284   if (buffer_in_use_ || bytes_received != error->offset)
285     return false;
286 
287   error->dispatched = true;
288   receive_error_callback_.Run(error->error);
289   return true;
290 }
291 
DispatchFatalError()292 void DataReceiver::PendingReceive::DispatchFatalError() {
293   receive_error_callback_.Run(fatal_error_value_);
294 }
295 
Done(uint32_t bytes_consumed)296 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) {
297   DCHECK(buffer_in_use_);
298   buffer_in_use_ = false;
299   receiver_->Done(bytes_consumed);
300 }
301 
Buffer(scoped_refptr<DataReceiver> receiver,PendingReceive * receive,const char * buffer,uint32_t buffer_size)302 DataReceiver::PendingReceive::Buffer::Buffer(
303     scoped_refptr<DataReceiver> receiver,
304     PendingReceive* receive,
305     const char* buffer,
306     uint32_t buffer_size)
307     : receiver_(receiver),
308       pending_receive_(receive),
309       buffer_(buffer),
310       buffer_size_(buffer_size) {
311 }
312 
~Buffer()313 DataReceiver::PendingReceive::Buffer::~Buffer() {
314   if (pending_receive_)
315     pending_receive_->Done(0);
316 }
317 
GetData()318 const char* DataReceiver::PendingReceive::Buffer::GetData() {
319   return buffer_;
320 }
321 
GetSize()322 uint32_t DataReceiver::PendingReceive::Buffer::GetSize() {
323   return buffer_size_;
324 }
325 
Done(uint32_t bytes_consumed)326 void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed) {
327   pending_receive_->Done(bytes_consumed);
328   pending_receive_ = NULL;
329   receiver_ = NULL;
330   buffer_ = NULL;
331   buffer_size_ = 0;
332 }
333 
DoneWithError(uint32_t bytes_consumed,int32_t error)334 void DataReceiver::PendingReceive::Buffer::DoneWithError(
335     uint32_t bytes_consumed,
336     int32_t error) {
337   Done(bytes_consumed);
338 }
339 
340 }  // namespace device
341