1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "device/serial/data_sink_receiver.h"
6
7 #include <limits>
8
9 #include "base/bind.h"
10 #include "device/serial/async_waiter.h"
11
12 namespace device {
13
14 // Represents a flush of data that has not been completed.
15 class DataSinkReceiver::PendingFlush {
16 public:
17 PendingFlush();
18
19 // Initializes this PendingFlush with |num_bytes|, the number of bytes to
20 // flush.
21 void SetNumBytesToFlush(uint32_t num_bytes);
22
23 // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns
24 // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than
25 // |bytes_to_flush_| bytes were flushed or the error if one is encountered
26 // discarding data from |handle|.
27 MojoResult Flush(mojo::DataPipeConsumerHandle handle);
28
29 // Whether this PendingFlush has received the number of bytes to flush.
received_flush()30 bool received_flush() { return received_flush_; }
31
32 private:
33 // Whether this PendingFlush has received the number of bytes to flush.
34 bool received_flush_;
35
36 // The remaining number of bytes to flush.
37 uint32_t bytes_to_flush_;
38 };
39
40 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
41 // a DataSinkReceiver.
42 class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
43 public:
44 Buffer(scoped_refptr<DataSinkReceiver> receiver,
45 const char* buffer,
46 uint32_t buffer_size);
47 virtual ~Buffer();
48
49 void Cancel(int32_t error);
50
51 // ReadOnlyBuffer overrides.
52 virtual const char* GetData() OVERRIDE;
53 virtual uint32_t GetSize() OVERRIDE;
54 virtual void Done(uint32_t bytes_read) OVERRIDE;
55 virtual void DoneWithError(uint32_t bytes_read, int32_t error) OVERRIDE;
56
57 private:
58 // The DataSinkReceiver whose data pipe we are providing a view.
59 scoped_refptr<DataSinkReceiver> receiver_;
60
61 const char* buffer_;
62 uint32_t buffer_size_;
63
64 // Whether this receive has been cancelled.
65 bool cancelled_;
66
67 // If |cancelled_|, contains the cancellation error to report.
68 int32_t cancellation_error_;
69 };
70
DataSinkReceiver(const ReadyCallback & ready_callback,const CancelCallback & cancel_callback,const ErrorCallback & error_callback)71 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback,
72 const CancelCallback& cancel_callback,
73 const ErrorCallback& error_callback)
74 : ready_callback_(ready_callback),
75 cancel_callback_(cancel_callback),
76 error_callback_(error_callback),
77 buffer_in_use_(NULL),
78 shut_down_(false),
79 weak_factory_(this) {
80 }
81
ShutDown()82 void DataSinkReceiver::ShutDown() {
83 shut_down_ = true;
84 if (waiter_)
85 waiter_.reset();
86 }
87
~DataSinkReceiver()88 DataSinkReceiver::~DataSinkReceiver() {
89 }
90
Init(mojo::ScopedDataPipeConsumerHandle handle)91 void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) {
92 if (handle_.is_valid()) {
93 DispatchFatalError();
94 return;
95 }
96
97 handle_ = handle.Pass();
98 StartWaiting();
99 }
100
Cancel(int32_t error)101 void DataSinkReceiver::Cancel(int32_t error) {
102 // If we have sent a ReportBytesSentAndError but have not received the
103 // response, that ReportBytesSentAndError message will appear to the
104 // DataSinkClient to be caused by this Cancel message. In that case, we ignore
105 // the cancel.
106 if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush())
107 return;
108
109 // If there is a buffer is in use, mark the buffer as cancelled and notify the
110 // client by calling |cancel_callback_|. The sink implementation may or may
111 // not take the cancellation into account when deciding what error (if any) to
112 // return. If the sink returns an error, we ignore the cancellation error.
113 // Otherwise, if the sink does not report an error, we override that with the
114 // cancellation error. Once a cancellation has been received, the next report
115 // sent to the client will always contain an error; the error returned by the
116 // sink or the cancellation error if the sink does not return an error.
117 if (buffer_in_use_) {
118 buffer_in_use_->Cancel(error);
119 if (!cancel_callback_.is_null())
120 cancel_callback_.Run(error);
121 return;
122 }
123 // If there is no buffer in use, immediately report the error and cancel the
124 // waiting for the data pipe if one exists. This transitions straight into the
125 // state after the sink has returned an error.
126 waiter_.reset();
127 ReportBytesSentAndError(0, error);
128 }
129
OnConnectionError()130 void DataSinkReceiver::OnConnectionError() {
131 DispatchFatalError();
132 }
133
StartWaiting()134 void DataSinkReceiver::StartWaiting() {
135 DCHECK(!waiter_ && !shut_down_);
136 waiter_.reset(
137 new AsyncWaiter(handle_.get(),
138 MOJO_HANDLE_SIGNAL_READABLE,
139 base::Bind(&DataSinkReceiver::OnDoneWaiting, this)));
140 }
141
OnDoneWaiting(MojoResult result)142 void DataSinkReceiver::OnDoneWaiting(MojoResult result) {
143 DCHECK(waiter_ && !shut_down_);
144 waiter_.reset();
145 if (result != MOJO_RESULT_OK) {
146 DispatchFatalError();
147 return;
148 }
149 // If there are any queued flushes (from ReportBytesSentAndError()), let them
150 // flush data from the data pipe.
151 if (!pending_flushes_.empty()) {
152 MojoResult result = pending_flushes_.front()->Flush(handle_.get());
153 if (result == MOJO_RESULT_OK) {
154 pending_flushes_.pop();
155 } else if (result != MOJO_RESULT_SHOULD_WAIT) {
156 DispatchFatalError();
157 return;
158 }
159 StartWaiting();
160 return;
161 }
162 const void* data = NULL;
163 uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
164 result = mojo::BeginReadDataRaw(
165 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
166 if (result != MOJO_RESULT_OK) {
167 DispatchFatalError();
168 return;
169 }
170 buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes);
171 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_));
172 }
173
Done(uint32_t bytes_read)174 void DataSinkReceiver::Done(uint32_t bytes_read) {
175 if (!DoneInternal(bytes_read))
176 return;
177 client()->ReportBytesSent(bytes_read);
178 StartWaiting();
179 }
180
DoneWithError(uint32_t bytes_read,int32_t error)181 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) {
182 if (!DoneInternal(bytes_read))
183 return;
184 ReportBytesSentAndError(bytes_read, error);
185 }
186
DoneInternal(uint32_t bytes_read)187 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) {
188 if (shut_down_)
189 return false;
190
191 DCHECK(buffer_in_use_);
192 buffer_in_use_ = NULL;
193 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read);
194 if (result != MOJO_RESULT_OK) {
195 DispatchFatalError();
196 return false;
197 }
198 return true;
199 }
200
ReportBytesSentAndError(uint32_t bytes_read,int32_t error)201 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read,
202 int32_t error) {
203 // When we encounter an error, we must discard the data from any sends already
204 // in the data pipe before we can resume dispatching data to the sink. We add
205 // a pending flush here. The response containing the number of bytes to flush
206 // is handled in SetNumBytesToFlush(). The actual flush is handled in
207 // OnDoneWaiting().
208 pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush()));
209 client()->ReportBytesSentAndError(
210 bytes_read,
211 error,
212 base::Bind(&DataSinkReceiver::SetNumBytesToFlush,
213 weak_factory_.GetWeakPtr()));
214 }
215
SetNumBytesToFlush(uint32_t bytes_to_flush)216 void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) {
217 DCHECK(!pending_flushes_.empty());
218 DCHECK(!pending_flushes_.back()->received_flush());
219 pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush);
220 if (!waiter_)
221 StartWaiting();
222 }
223
DispatchFatalError()224 void DataSinkReceiver::DispatchFatalError() {
225 if (shut_down_)
226 return;
227
228 ShutDown();
229 if (!error_callback_.is_null())
230 error_callback_.Run();
231 }
232
Buffer(scoped_refptr<DataSinkReceiver> receiver,const char * buffer,uint32_t buffer_size)233 DataSinkReceiver::Buffer::Buffer(scoped_refptr<DataSinkReceiver> receiver,
234 const char* buffer,
235 uint32_t buffer_size)
236 : receiver_(receiver),
237 buffer_(buffer),
238 buffer_size_(buffer_size),
239 cancelled_(false),
240 cancellation_error_(0) {
241 }
242
~Buffer()243 DataSinkReceiver::Buffer::~Buffer() {
244 if (!receiver_.get())
245 return;
246 if (cancelled_)
247 receiver_->DoneWithError(0, cancellation_error_);
248 else
249 receiver_->Done(0);
250 }
251
Cancel(int32_t error)252 void DataSinkReceiver::Buffer::Cancel(int32_t error) {
253 cancelled_ = true;
254 cancellation_error_ = error;
255 }
256
GetData()257 const char* DataSinkReceiver::Buffer::GetData() {
258 return buffer_;
259 }
260
GetSize()261 uint32_t DataSinkReceiver::Buffer::GetSize() {
262 return buffer_size_;
263 }
264
Done(uint32_t bytes_read)265 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) {
266 if (cancelled_)
267 receiver_->DoneWithError(bytes_read, cancellation_error_);
268 else
269 receiver_->Done(bytes_read);
270 receiver_ = NULL;
271 buffer_ = NULL;
272 buffer_size_ = 0;
273 }
274
DoneWithError(uint32_t bytes_read,int32_t error)275 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read,
276 int32_t error) {
277 receiver_->DoneWithError(bytes_read, error);
278 receiver_ = NULL;
279 buffer_ = NULL;
280 buffer_size_ = 0;
281 }
282
PendingFlush()283 DataSinkReceiver::PendingFlush::PendingFlush()
284 : received_flush_(false), bytes_to_flush_(0) {
285 }
286
SetNumBytesToFlush(uint32_t num_bytes)287 void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) {
288 DCHECK(!received_flush_);
289 received_flush_ = true;
290 bytes_to_flush_ = num_bytes;
291 }
292
Flush(mojo::DataPipeConsumerHandle handle)293 MojoResult DataSinkReceiver::PendingFlush::Flush(
294 mojo::DataPipeConsumerHandle handle) {
295 DCHECK(received_flush_);
296 uint32_t num_bytes = bytes_to_flush_;
297 MojoResult result =
298 mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD);
299 if (result != MOJO_RESULT_OK)
300 return result;
301 DCHECK(num_bytes <= bytes_to_flush_);
302 bytes_to_flush_ -= num_bytes;
303 return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
304 }
305
306 } // namespace device
307