• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "device/serial/data_sink_receiver.h"
6 
7 #include <limits>
8 
9 #include "base/bind.h"
10 #include "device/serial/async_waiter.h"
11 
12 namespace device {
13 
14 // Represents a flush of data that has not been completed.
15 class DataSinkReceiver::PendingFlush {
16  public:
17   PendingFlush();
18 
19   // Initializes this PendingFlush with |num_bytes|, the number of bytes to
20   // flush.
21   void SetNumBytesToFlush(uint32_t num_bytes);
22 
23   // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns
24   // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than
25   // |bytes_to_flush_| bytes were flushed or the error if one is encountered
26   // discarding data from |handle|.
27   MojoResult Flush(mojo::DataPipeConsumerHandle handle);
28 
29   // Whether this PendingFlush has received the number of bytes to flush.
received_flush()30   bool received_flush() { return received_flush_; }
31 
32  private:
33   // Whether this PendingFlush has received the number of bytes to flush.
34   bool received_flush_;
35 
36   // The remaining number of bytes to flush.
37   uint32_t bytes_to_flush_;
38 };
39 
40 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
41 // a DataSinkReceiver.
42 class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
43  public:
44   Buffer(scoped_refptr<DataSinkReceiver> receiver,
45          const char* buffer,
46          uint32_t buffer_size);
47   virtual ~Buffer();
48 
49   void Cancel(int32_t error);
50 
51   // ReadOnlyBuffer overrides.
52   virtual const char* GetData() OVERRIDE;
53   virtual uint32_t GetSize() OVERRIDE;
54   virtual void Done(uint32_t bytes_read) OVERRIDE;
55   virtual void DoneWithError(uint32_t bytes_read, int32_t error) OVERRIDE;
56 
57  private:
58   // The DataSinkReceiver whose data pipe we are providing a view.
59   scoped_refptr<DataSinkReceiver> receiver_;
60 
61   const char* buffer_;
62   uint32_t buffer_size_;
63 
64   // Whether this receive has been cancelled.
65   bool cancelled_;
66 
67   // If |cancelled_|, contains the cancellation error to report.
68   int32_t cancellation_error_;
69 };
70 
DataSinkReceiver(const ReadyCallback & ready_callback,const CancelCallback & cancel_callback,const ErrorCallback & error_callback)71 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback,
72                                    const CancelCallback& cancel_callback,
73                                    const ErrorCallback& error_callback)
74     : ready_callback_(ready_callback),
75       cancel_callback_(cancel_callback),
76       error_callback_(error_callback),
77       buffer_in_use_(NULL),
78       shut_down_(false),
79       weak_factory_(this) {
80 }
81 
ShutDown()82 void DataSinkReceiver::ShutDown() {
83   shut_down_ = true;
84   if (waiter_)
85     waiter_.reset();
86 }
87 
~DataSinkReceiver()88 DataSinkReceiver::~DataSinkReceiver() {
89 }
90 
Init(mojo::ScopedDataPipeConsumerHandle handle)91 void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) {
92   if (handle_.is_valid()) {
93     DispatchFatalError();
94     return;
95   }
96 
97   handle_ = handle.Pass();
98   StartWaiting();
99 }
100 
Cancel(int32_t error)101 void DataSinkReceiver::Cancel(int32_t error) {
102   // If we have sent a ReportBytesSentAndError but have not received the
103   // response, that ReportBytesSentAndError message will appear to the
104   // DataSinkClient to be caused by this Cancel message. In that case, we ignore
105   // the cancel.
106   if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush())
107     return;
108 
109   // If there is a buffer is in use, mark the buffer as cancelled and notify the
110   // client by calling |cancel_callback_|. The sink implementation may or may
111   // not take the cancellation into account when deciding what error (if any) to
112   // return. If the sink returns an error, we ignore the cancellation error.
113   // Otherwise, if the sink does not report an error, we override that with the
114   // cancellation error. Once a cancellation has been received, the next report
115   // sent to the client will always contain an error; the error returned by the
116   // sink or the cancellation error if the sink does not return an error.
117   if (buffer_in_use_) {
118     buffer_in_use_->Cancel(error);
119     if (!cancel_callback_.is_null())
120       cancel_callback_.Run(error);
121     return;
122   }
123   // If there is no buffer in use, immediately report the error and cancel the
124   // waiting for the data pipe if one exists. This transitions straight into the
125   // state after the sink has returned an error.
126   waiter_.reset();
127   ReportBytesSentAndError(0, error);
128 }
129 
OnConnectionError()130 void DataSinkReceiver::OnConnectionError() {
131   DispatchFatalError();
132 }
133 
StartWaiting()134 void DataSinkReceiver::StartWaiting() {
135   DCHECK(!waiter_ && !shut_down_);
136   waiter_.reset(
137       new AsyncWaiter(handle_.get(),
138                       MOJO_HANDLE_SIGNAL_READABLE,
139                       base::Bind(&DataSinkReceiver::OnDoneWaiting, this)));
140 }
141 
OnDoneWaiting(MojoResult result)142 void DataSinkReceiver::OnDoneWaiting(MojoResult result) {
143   DCHECK(waiter_ && !shut_down_);
144   waiter_.reset();
145   if (result != MOJO_RESULT_OK) {
146     DispatchFatalError();
147     return;
148   }
149   // If there are any queued flushes (from ReportBytesSentAndError()), let them
150   // flush data from the data pipe.
151   if (!pending_flushes_.empty()) {
152     MojoResult result = pending_flushes_.front()->Flush(handle_.get());
153     if (result == MOJO_RESULT_OK) {
154       pending_flushes_.pop();
155     } else if (result != MOJO_RESULT_SHOULD_WAIT) {
156       DispatchFatalError();
157       return;
158     }
159     StartWaiting();
160     return;
161   }
162   const void* data = NULL;
163   uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
164   result = mojo::BeginReadDataRaw(
165       handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
166   if (result != MOJO_RESULT_OK) {
167     DispatchFatalError();
168     return;
169   }
170   buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes);
171   ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_));
172 }
173 
Done(uint32_t bytes_read)174 void DataSinkReceiver::Done(uint32_t bytes_read) {
175   if (!DoneInternal(bytes_read))
176     return;
177   client()->ReportBytesSent(bytes_read);
178   StartWaiting();
179 }
180 
DoneWithError(uint32_t bytes_read,int32_t error)181 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) {
182   if (!DoneInternal(bytes_read))
183     return;
184   ReportBytesSentAndError(bytes_read, error);
185 }
186 
DoneInternal(uint32_t bytes_read)187 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) {
188   if (shut_down_)
189     return false;
190 
191   DCHECK(buffer_in_use_);
192   buffer_in_use_ = NULL;
193   MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read);
194   if (result != MOJO_RESULT_OK) {
195     DispatchFatalError();
196     return false;
197   }
198   return true;
199 }
200 
ReportBytesSentAndError(uint32_t bytes_read,int32_t error)201 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read,
202                                                int32_t error) {
203   // When we encounter an error, we must discard the data from any sends already
204   // in the data pipe before we can resume dispatching data to the sink. We add
205   // a pending flush here. The response containing the number of bytes to flush
206   // is handled in SetNumBytesToFlush(). The actual flush is handled in
207   // OnDoneWaiting().
208   pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush()));
209   client()->ReportBytesSentAndError(
210       bytes_read,
211       error,
212       base::Bind(&DataSinkReceiver::SetNumBytesToFlush,
213                  weak_factory_.GetWeakPtr()));
214 }
215 
SetNumBytesToFlush(uint32_t bytes_to_flush)216 void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) {
217   DCHECK(!pending_flushes_.empty());
218   DCHECK(!pending_flushes_.back()->received_flush());
219   pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush);
220   if (!waiter_)
221     StartWaiting();
222 }
223 
DispatchFatalError()224 void DataSinkReceiver::DispatchFatalError() {
225   if (shut_down_)
226     return;
227 
228   ShutDown();
229   if (!error_callback_.is_null())
230     error_callback_.Run();
231 }
232 
Buffer(scoped_refptr<DataSinkReceiver> receiver,const char * buffer,uint32_t buffer_size)233 DataSinkReceiver::Buffer::Buffer(scoped_refptr<DataSinkReceiver> receiver,
234                                  const char* buffer,
235                                  uint32_t buffer_size)
236     : receiver_(receiver),
237       buffer_(buffer),
238       buffer_size_(buffer_size),
239       cancelled_(false),
240       cancellation_error_(0) {
241 }
242 
~Buffer()243 DataSinkReceiver::Buffer::~Buffer() {
244   if (!receiver_.get())
245     return;
246   if (cancelled_)
247     receiver_->DoneWithError(0, cancellation_error_);
248   else
249     receiver_->Done(0);
250 }
251 
Cancel(int32_t error)252 void DataSinkReceiver::Buffer::Cancel(int32_t error) {
253   cancelled_ = true;
254   cancellation_error_ = error;
255 }
256 
GetData()257 const char* DataSinkReceiver::Buffer::GetData() {
258   return buffer_;
259 }
260 
GetSize()261 uint32_t DataSinkReceiver::Buffer::GetSize() {
262   return buffer_size_;
263 }
264 
Done(uint32_t bytes_read)265 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) {
266   if (cancelled_)
267     receiver_->DoneWithError(bytes_read, cancellation_error_);
268   else
269     receiver_->Done(bytes_read);
270   receiver_ = NULL;
271   buffer_ = NULL;
272   buffer_size_ = 0;
273 }
274 
DoneWithError(uint32_t bytes_read,int32_t error)275 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read,
276                                              int32_t error) {
277   receiver_->DoneWithError(bytes_read, error);
278   receiver_ = NULL;
279   buffer_ = NULL;
280   buffer_size_ = 0;
281 }
282 
PendingFlush()283 DataSinkReceiver::PendingFlush::PendingFlush()
284     : received_flush_(false), bytes_to_flush_(0) {
285 }
286 
SetNumBytesToFlush(uint32_t num_bytes)287 void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) {
288   DCHECK(!received_flush_);
289   received_flush_ = true;
290   bytes_to_flush_ = num_bytes;
291 }
292 
Flush(mojo::DataPipeConsumerHandle handle)293 MojoResult DataSinkReceiver::PendingFlush::Flush(
294     mojo::DataPipeConsumerHandle handle) {
295   DCHECK(received_flush_);
296   uint32_t num_bytes = bytes_to_flush_;
297   MojoResult result =
298       mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD);
299   if (result != MOJO_RESULT_OK)
300     return result;
301   DCHECK(num_bytes <= bytes_to_flush_);
302   bytes_to_flush_ -= num_bytes;
303   return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
304 }
305 
306 }  // namespace device
307