1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <brillo/streams/fake_stream.h>
6
7 #include <algorithm>
8
9 #include <base/bind.h>
10 #include <brillo/message_loops/message_loop.h>
11 #include <brillo/streams/stream_utils.h>
12
13 namespace brillo {
14
15 namespace {
16
17 // Gets a delta between the two times, makes sure that the delta is positive.
CalculateDelay(const base::Time & now,const base::Time & delay_until)18 base::TimeDelta CalculateDelay(const base::Time& now,
19 const base::Time& delay_until) {
20 const base::TimeDelta zero_delay;
21 if (delay_until.is_null() || now >= delay_until) {
22 return zero_delay;
23 }
24
25 base::TimeDelta delay = delay_until - now;
26 if (delay < zero_delay)
27 delay = zero_delay;
28 return delay;
29 }
30
31 // Given the current clock time, and expected delays for read and write
32 // operations calculates the smaller wait delay of the two and sets the
33 // resulting operation to |*mode| and the delay to wait for into |*delay|.
GetMinDelayAndMode(const base::Time & now,bool read,const base::Time & delay_read_until,bool write,const base::Time & delay_write_until,Stream::AccessMode * mode,base::TimeDelta * delay)34 void GetMinDelayAndMode(const base::Time& now,
35 bool read, const base::Time& delay_read_until,
36 bool write, const base::Time& delay_write_until,
37 Stream::AccessMode* mode, base::TimeDelta* delay) {
38 base::TimeDelta read_delay = base::TimeDelta::Max();
39 base::TimeDelta write_delay = base::TimeDelta::Max();
40
41 if (read)
42 read_delay = CalculateDelay(now, delay_read_until);
43 if (write)
44 write_delay = CalculateDelay(now, delay_write_until);
45
46 if (read_delay > write_delay) {
47 read = false;
48 } else if (read_delay < write_delay) {
49 write = false;
50 }
51 *mode = stream_utils::MakeAccessMode(read, write);
52 *delay = std::min(read_delay, write_delay);
53 }
54
55 } // anonymous namespace
56
FakeStream(Stream::AccessMode mode,base::Clock * clock)57 FakeStream::FakeStream(Stream::AccessMode mode,
58 base::Clock* clock)
59 : mode_{mode}, clock_{clock} {}
60
AddReadPacketData(base::TimeDelta delay,const void * data,size_t size)61 void FakeStream::AddReadPacketData(base::TimeDelta delay,
62 const void* data,
63 size_t size) {
64 auto* byte_ptr = static_cast<const uint8_t*>(data);
65 AddReadPacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
66 }
67
AddReadPacketData(base::TimeDelta delay,brillo::Blob data)68 void FakeStream::AddReadPacketData(base::TimeDelta delay, brillo::Blob data) {
69 InputDataPacket packet;
70 packet.data = std::move(data);
71 packet.delay_before = delay;
72 incoming_queue_.push(std::move(packet));
73 }
74
AddReadPacketString(base::TimeDelta delay,const std::string & data)75 void FakeStream::AddReadPacketString(base::TimeDelta delay,
76 const std::string& data) {
77 AddReadPacketData(delay, brillo::Blob{data.begin(), data.end()});
78 }
79
QueueReadError(base::TimeDelta delay)80 void FakeStream::QueueReadError(base::TimeDelta delay) {
81 QueueReadErrorWithMessage(delay, std::string{});
82 }
83
QueueReadErrorWithMessage(base::TimeDelta delay,const std::string & message)84 void FakeStream::QueueReadErrorWithMessage(base::TimeDelta delay,
85 const std::string& message) {
86 InputDataPacket packet;
87 packet.data.assign(message.begin(), message.end());
88 packet.delay_before = delay;
89 packet.read_error = true;
90 incoming_queue_.push(std::move(packet));
91 }
92
ClearReadQueue()93 void FakeStream::ClearReadQueue() {
94 std::queue<InputDataPacket>().swap(incoming_queue_);
95 delay_input_until_ = base::Time{};
96 input_buffer_.clear();
97 input_ptr_ = 0;
98 report_read_error_ = 0;
99 }
100
ExpectWritePacketSize(base::TimeDelta delay,size_t data_size)101 void FakeStream::ExpectWritePacketSize(base::TimeDelta delay,
102 size_t data_size) {
103 OutputDataPacket packet;
104 packet.expected_size = data_size;
105 packet.delay_before = delay;
106 outgoing_queue_.push(std::move(packet));
107 }
108
ExpectWritePacketData(base::TimeDelta delay,const void * data,size_t size)109 void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
110 const void* data,
111 size_t size) {
112 auto* byte_ptr = static_cast<const uint8_t*>(data);
113 ExpectWritePacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
114 }
115
ExpectWritePacketData(base::TimeDelta delay,brillo::Blob data)116 void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
117 brillo::Blob data) {
118 OutputDataPacket packet;
119 packet.expected_size = data.size();
120 packet.data = std::move(data);
121 packet.delay_before = delay;
122 outgoing_queue_.push(std::move(packet));
123 }
124
ExpectWritePacketString(base::TimeDelta delay,const std::string & data)125 void FakeStream::ExpectWritePacketString(base::TimeDelta delay,
126 const std::string& data) {
127 ExpectWritePacketData(delay, brillo::Blob{data.begin(), data.end()});
128 }
129
QueueWriteError(base::TimeDelta delay)130 void FakeStream::QueueWriteError(base::TimeDelta delay) {
131 QueueWriteErrorWithMessage(delay, std::string{});
132 }
133
QueueWriteErrorWithMessage(base::TimeDelta delay,const std::string & message)134 void FakeStream::QueueWriteErrorWithMessage(base::TimeDelta delay,
135 const std::string& message) {
136 OutputDataPacket packet;
137 packet.expected_size = 0;
138 packet.data.assign(message.begin(), message.end());
139 packet.delay_before = delay;
140 packet.write_error = true;
141 outgoing_queue_.push(std::move(packet));
142 }
143
ClearWriteQueue()144 void FakeStream::ClearWriteQueue() {
145 std::queue<OutputDataPacket>().swap(outgoing_queue_);
146 delay_output_until_ = base::Time{};
147 output_buffer_.clear();
148 expected_output_data_.clear();
149 max_output_buffer_size_ = 0;
150 all_output_data_.clear();
151 report_write_error_ = 0;
152 }
153
GetFlushedOutputData() const154 const brillo::Blob& FakeStream::GetFlushedOutputData() const {
155 return all_output_data_;
156 }
157
GetFlushedOutputDataAsString() const158 std::string FakeStream::GetFlushedOutputDataAsString() const {
159 return std::string{all_output_data_.begin(), all_output_data_.end()};
160 }
161
CanRead() const162 bool FakeStream::CanRead() const {
163 return stream_utils::IsReadAccessMode(mode_);
164 }
165
CanWrite() const166 bool FakeStream::CanWrite() const {
167 return stream_utils::IsWriteAccessMode(mode_);
168 }
169
SetSizeBlocking(uint64_t,ErrorPtr * error)170 bool FakeStream::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
171 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
172 }
173
Seek(int64_t,Whence,uint64_t *,ErrorPtr * error)174 bool FakeStream::Seek(int64_t /* offset */,
175 Whence /* whence */,
176 uint64_t* /* new_position */,
177 ErrorPtr* error) {
178 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
179 }
180
IsReadBufferEmpty() const181 bool FakeStream::IsReadBufferEmpty() const {
182 return input_ptr_ >= input_buffer_.size();
183 }
184
PopReadPacket()185 bool FakeStream::PopReadPacket() {
186 if (incoming_queue_.empty())
187 return false;
188 const InputDataPacket& packet = incoming_queue_.front();
189 input_ptr_ = 0;
190 input_buffer_ = std::move(packet.data);
191 delay_input_until_ = clock_->Now() + packet.delay_before;
192 incoming_queue_.pop();
193 report_read_error_ = packet.read_error;
194 return true;
195 }
196
ReadNonBlocking(void * buffer,size_t size_to_read,size_t * size_read,bool * end_of_stream,ErrorPtr * error)197 bool FakeStream::ReadNonBlocking(void* buffer,
198 size_t size_to_read,
199 size_t* size_read,
200 bool* end_of_stream,
201 ErrorPtr* error) {
202 if (!CanRead())
203 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
204
205 if (!IsOpen())
206 return stream_utils::ErrorStreamClosed(FROM_HERE, error);
207
208 for (;;) {
209 if (!delay_input_until_.is_null() && clock_->Now() < delay_input_until_) {
210 *size_read = 0;
211 if (end_of_stream)
212 *end_of_stream = false;
213 break;
214 }
215
216 if (report_read_error_) {
217 report_read_error_ = false;
218 std::string message{input_buffer_.begin(), input_buffer_.end()};
219 if (message.empty())
220 message = "Simulating read error for tests";
221 input_buffer_.clear();
222 Error::AddTo(error, FROM_HERE, "fake_stream", "read_error", message);
223 return false;
224 }
225
226 if (!IsReadBufferEmpty()) {
227 size_to_read = std::min(size_to_read, input_buffer_.size() - input_ptr_);
228 std::memcpy(buffer, input_buffer_.data() + input_ptr_, size_to_read);
229 input_ptr_ += size_to_read;
230 *size_read = size_to_read;
231 if (end_of_stream)
232 *end_of_stream = false;
233 break;
234 }
235
236 if (!PopReadPacket()) {
237 *size_read = 0;
238 if (end_of_stream)
239 *end_of_stream = true;
240 break;
241 }
242 }
243 return true;
244 }
245
IsWriteBufferFull() const246 bool FakeStream::IsWriteBufferFull() const {
247 return output_buffer_.size() >= max_output_buffer_size_;
248 }
249
PopWritePacket()250 bool FakeStream::PopWritePacket() {
251 if (outgoing_queue_.empty())
252 return false;
253 const OutputDataPacket& packet = outgoing_queue_.front();
254 expected_output_data_ = std::move(packet.data);
255 delay_output_until_ = clock_->Now() + packet.delay_before;
256 max_output_buffer_size_ = packet.expected_size;
257 report_write_error_ = packet.write_error;
258 outgoing_queue_.pop();
259 return true;
260 }
261
WriteNonBlocking(const void * buffer,size_t size_to_write,size_t * size_written,ErrorPtr * error)262 bool FakeStream::WriteNonBlocking(const void* buffer,
263 size_t size_to_write,
264 size_t* size_written,
265 ErrorPtr* error) {
266 if (!CanWrite())
267 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
268
269 if (!IsOpen())
270 return stream_utils::ErrorStreamClosed(FROM_HERE, error);
271
272 for (;;) {
273 if (!delay_output_until_.is_null() && clock_->Now() < delay_output_until_) {
274 *size_written = 0;
275 return true;
276 }
277
278 if (report_write_error_) {
279 report_write_error_ = false;
280 std::string message{expected_output_data_.begin(),
281 expected_output_data_.end()};
282 if (message.empty())
283 message = "Simulating write error for tests";
284 output_buffer_.clear();
285 max_output_buffer_size_ = 0;
286 expected_output_data_.clear();
287 Error::AddTo(error, FROM_HERE, "fake_stream", "write_error", message);
288 return false;
289 }
290
291 if (!IsWriteBufferFull()) {
292 bool success = true;
293 size_to_write = std::min(size_to_write,
294 max_output_buffer_size_ - output_buffer_.size());
295 auto byte_ptr = static_cast<const uint8_t*>(buffer);
296 output_buffer_.insert(output_buffer_.end(),
297 byte_ptr, byte_ptr + size_to_write);
298 if (output_buffer_.size() == max_output_buffer_size_) {
299 if (!expected_output_data_.empty() &&
300 expected_output_data_ != output_buffer_) {
301 // We expected different data to be written, report an error.
302 Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
303 "Unexpected data written");
304 success = false;
305 }
306
307 all_output_data_.insert(all_output_data_.end(),
308 output_buffer_.begin(), output_buffer_.end());
309
310 output_buffer_.clear();
311 max_output_buffer_size_ = 0;
312 expected_output_data_.clear();
313 }
314 *size_written = size_to_write;
315 return success;
316 }
317
318 if (!PopWritePacket()) {
319 // No more data expected.
320 Error::AddTo(error, FROM_HERE, "fake_stream", "full",
321 "No more output data expected");
322 return false;
323 }
324 }
325 }
326
FlushBlocking(ErrorPtr * error)327 bool FakeStream::FlushBlocking(ErrorPtr* error) {
328 if (!CanWrite())
329 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
330
331 if (!IsOpen())
332 return stream_utils::ErrorStreamClosed(FROM_HERE, error);
333
334 bool success = true;
335 if (!output_buffer_.empty()) {
336 if (!expected_output_data_.empty() &&
337 expected_output_data_ != output_buffer_) {
338 // We expected different data to be written, report an error.
339 Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
340 "Unexpected data written");
341 success = false;
342 }
343 all_output_data_.insert(all_output_data_.end(),
344 output_buffer_.begin(), output_buffer_.end());
345
346 output_buffer_.clear();
347 max_output_buffer_size_ = 0;
348 expected_output_data_.clear();
349 }
350 return success;
351 }
352
CloseBlocking(ErrorPtr *)353 bool FakeStream::CloseBlocking(ErrorPtr* /* error */) {
354 is_open_ = false;
355 return true;
356 }
357
WaitForData(AccessMode mode,const base::Callback<void (AccessMode)> & callback,ErrorPtr * error)358 bool FakeStream::WaitForData(AccessMode mode,
359 const base::Callback<void(AccessMode)>& callback,
360 ErrorPtr* error) {
361 bool read_requested = stream_utils::IsReadAccessMode(mode);
362 bool write_requested = stream_utils::IsWriteAccessMode(mode);
363
364 if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
365 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
366
367 if (read_requested && IsReadBufferEmpty())
368 PopReadPacket();
369 if (write_requested && IsWriteBufferFull())
370 PopWritePacket();
371
372 base::TimeDelta delay;
373 GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
374 write_requested, delay_output_until_, &mode, &delay);
375 MessageLoop::current()->PostDelayedTask(
376 FROM_HERE, base::Bind(callback, mode), delay);
377 return true;
378 }
379
WaitForDataBlocking(AccessMode in_mode,base::TimeDelta timeout,AccessMode * out_mode,ErrorPtr * error)380 bool FakeStream::WaitForDataBlocking(AccessMode in_mode,
381 base::TimeDelta timeout,
382 AccessMode* out_mode,
383 ErrorPtr* error) {
384 bool read_requested = stream_utils::IsReadAccessMode(in_mode);
385 bool write_requested = stream_utils::IsWriteAccessMode(in_mode);
386
387 if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
388 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
389
390 base::TimeDelta delay;
391 GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
392 write_requested, delay_output_until_, out_mode, &delay);
393
394 if (timeout < delay)
395 return stream_utils::ErrorOperationTimeout(FROM_HERE, error);
396
397 LOG(INFO) << "TEST: Would have blocked for " << delay.InMilliseconds()
398 << " ms.";
399
400 return true;
401 }
402
403 } // namespace brillo
404