• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <brillo/streams/fake_stream.h>
6 
7 #include <algorithm>
8 
9 #include <base/bind.h>
10 #include <brillo/message_loops/message_loop.h>
11 #include <brillo/streams/stream_utils.h>
12 
13 namespace brillo {
14 
15 namespace {
16 
17 // Gets a delta between the two times, makes sure that the delta is positive.
CalculateDelay(const base::Time & now,const base::Time & delay_until)18 base::TimeDelta CalculateDelay(const base::Time& now,
19                                const base::Time& delay_until) {
20   const base::TimeDelta zero_delay;
21   if (delay_until.is_null() || now >= delay_until) {
22     return zero_delay;
23   }
24 
25   base::TimeDelta delay = delay_until - now;
26   if (delay < zero_delay)
27     delay = zero_delay;
28   return delay;
29 }
30 
31 // Given the current clock time, and expected delays for read and write
32 // operations calculates the smaller wait delay of the two and sets the
33 // resulting operation to |*mode| and the delay to wait for into |*delay|.
GetMinDelayAndMode(const base::Time & now,bool read,const base::Time & delay_read_until,bool write,const base::Time & delay_write_until,Stream::AccessMode * mode,base::TimeDelta * delay)34 void GetMinDelayAndMode(const base::Time& now,
35                         bool read, const base::Time& delay_read_until,
36                         bool write, const base::Time& delay_write_until,
37                         Stream::AccessMode* mode, base::TimeDelta* delay) {
38   base::TimeDelta read_delay = base::TimeDelta::Max();
39   base::TimeDelta write_delay = base::TimeDelta::Max();
40 
41   if (read)
42     read_delay = CalculateDelay(now, delay_read_until);
43   if (write)
44     write_delay = CalculateDelay(now, delay_write_until);
45 
46   if (read_delay > write_delay) {
47     read = false;
48   } else if (read_delay < write_delay) {
49     write = false;
50   }
51   *mode = stream_utils::MakeAccessMode(read, write);
52   *delay = std::min(read_delay, write_delay);
53 }
54 
55 }  // anonymous namespace
56 
FakeStream(Stream::AccessMode mode,base::Clock * clock)57 FakeStream::FakeStream(Stream::AccessMode mode,
58                        base::Clock* clock)
59     : mode_{mode}, clock_{clock} {}
60 
AddReadPacketData(base::TimeDelta delay,const void * data,size_t size)61 void FakeStream::AddReadPacketData(base::TimeDelta delay,
62                                    const void* data,
63                                    size_t size) {
64   auto* byte_ptr = static_cast<const uint8_t*>(data);
65   AddReadPacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
66 }
67 
AddReadPacketData(base::TimeDelta delay,brillo::Blob data)68 void FakeStream::AddReadPacketData(base::TimeDelta delay, brillo::Blob data) {
69   InputDataPacket packet;
70   packet.data = std::move(data);
71   packet.delay_before = delay;
72   incoming_queue_.push(std::move(packet));
73 }
74 
AddReadPacketString(base::TimeDelta delay,const std::string & data)75 void FakeStream::AddReadPacketString(base::TimeDelta delay,
76                                      const std::string& data) {
77   AddReadPacketData(delay, brillo::Blob{data.begin(), data.end()});
78 }
79 
QueueReadError(base::TimeDelta delay)80 void FakeStream::QueueReadError(base::TimeDelta delay) {
81   QueueReadErrorWithMessage(delay, std::string{});
82 }
83 
QueueReadErrorWithMessage(base::TimeDelta delay,const std::string & message)84 void FakeStream::QueueReadErrorWithMessage(base::TimeDelta delay,
85                                            const std::string& message) {
86   InputDataPacket packet;
87   packet.data.assign(message.begin(), message.end());
88   packet.delay_before = delay;
89   packet.read_error = true;
90   incoming_queue_.push(std::move(packet));
91 }
92 
ClearReadQueue()93 void FakeStream::ClearReadQueue() {
94   std::queue<InputDataPacket>().swap(incoming_queue_);
95   delay_input_until_ = base::Time{};
96   input_buffer_.clear();
97   input_ptr_ = 0;
98   report_read_error_ = 0;
99 }
100 
ExpectWritePacketSize(base::TimeDelta delay,size_t data_size)101 void FakeStream::ExpectWritePacketSize(base::TimeDelta delay,
102                                        size_t data_size) {
103   OutputDataPacket packet;
104   packet.expected_size = data_size;
105   packet.delay_before = delay;
106   outgoing_queue_.push(std::move(packet));
107 }
108 
ExpectWritePacketData(base::TimeDelta delay,const void * data,size_t size)109 void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
110                                        const void* data,
111                                        size_t size) {
112   auto* byte_ptr = static_cast<const uint8_t*>(data);
113   ExpectWritePacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
114 }
115 
ExpectWritePacketData(base::TimeDelta delay,brillo::Blob data)116 void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
117                                        brillo::Blob data) {
118   OutputDataPacket packet;
119   packet.expected_size = data.size();
120   packet.data = std::move(data);
121   packet.delay_before = delay;
122   outgoing_queue_.push(std::move(packet));
123 }
124 
ExpectWritePacketString(base::TimeDelta delay,const std::string & data)125 void FakeStream::ExpectWritePacketString(base::TimeDelta delay,
126                                          const std::string& data) {
127   ExpectWritePacketData(delay, brillo::Blob{data.begin(), data.end()});
128 }
129 
QueueWriteError(base::TimeDelta delay)130 void FakeStream::QueueWriteError(base::TimeDelta delay) {
131   QueueWriteErrorWithMessage(delay, std::string{});
132 }
133 
QueueWriteErrorWithMessage(base::TimeDelta delay,const std::string & message)134 void FakeStream::QueueWriteErrorWithMessage(base::TimeDelta delay,
135                                             const std::string& message) {
136   OutputDataPacket packet;
137   packet.expected_size = 0;
138   packet.data.assign(message.begin(), message.end());
139   packet.delay_before = delay;
140   packet.write_error = true;
141   outgoing_queue_.push(std::move(packet));
142 }
143 
ClearWriteQueue()144 void FakeStream::ClearWriteQueue() {
145   std::queue<OutputDataPacket>().swap(outgoing_queue_);
146   delay_output_until_ = base::Time{};
147   output_buffer_.clear();
148   expected_output_data_.clear();
149   max_output_buffer_size_ = 0;
150   all_output_data_.clear();
151   report_write_error_ = 0;
152 }
153 
GetFlushedOutputData() const154 const brillo::Blob& FakeStream::GetFlushedOutputData() const {
155   return all_output_data_;
156 }
157 
GetFlushedOutputDataAsString() const158 std::string FakeStream::GetFlushedOutputDataAsString() const {
159   return std::string{all_output_data_.begin(), all_output_data_.end()};
160 }
161 
CanRead() const162 bool FakeStream::CanRead() const {
163   return stream_utils::IsReadAccessMode(mode_);
164 }
165 
CanWrite() const166 bool FakeStream::CanWrite() const {
167   return stream_utils::IsWriteAccessMode(mode_);
168 }
169 
SetSizeBlocking(uint64_t,ErrorPtr * error)170 bool FakeStream::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
171   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
172 }
173 
Seek(int64_t,Whence,uint64_t *,ErrorPtr * error)174 bool FakeStream::Seek(int64_t /* offset */,
175                       Whence /* whence */,
176                       uint64_t* /* new_position */,
177                       ErrorPtr* error) {
178   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
179 }
180 
IsReadBufferEmpty() const181 bool FakeStream::IsReadBufferEmpty() const {
182   return input_ptr_ >= input_buffer_.size();
183 }
184 
PopReadPacket()185 bool FakeStream::PopReadPacket() {
186   if (incoming_queue_.empty())
187     return false;
188   const InputDataPacket& packet = incoming_queue_.front();
189   input_ptr_ = 0;
190   input_buffer_ = std::move(packet.data);
191   delay_input_until_ = clock_->Now() + packet.delay_before;
192   incoming_queue_.pop();
193   report_read_error_ = packet.read_error;
194   return true;
195 }
196 
ReadNonBlocking(void * buffer,size_t size_to_read,size_t * size_read,bool * end_of_stream,ErrorPtr * error)197 bool FakeStream::ReadNonBlocking(void* buffer,
198                                  size_t size_to_read,
199                                  size_t* size_read,
200                                  bool* end_of_stream,
201                                  ErrorPtr* error) {
202   if (!CanRead())
203     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
204 
205   if (!IsOpen())
206     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
207 
208   for (;;) {
209     if (!delay_input_until_.is_null() && clock_->Now() < delay_input_until_) {
210       *size_read = 0;
211       if (end_of_stream)
212         *end_of_stream = false;
213       break;
214     }
215 
216     if (report_read_error_) {
217       report_read_error_ = false;
218       std::string message{input_buffer_.begin(), input_buffer_.end()};
219       if (message.empty())
220         message = "Simulating read error for tests";
221       input_buffer_.clear();
222       Error::AddTo(error, FROM_HERE, "fake_stream", "read_error", message);
223       return false;
224     }
225 
226     if (!IsReadBufferEmpty()) {
227       size_to_read = std::min(size_to_read, input_buffer_.size() - input_ptr_);
228       std::memcpy(buffer, input_buffer_.data() + input_ptr_, size_to_read);
229       input_ptr_ += size_to_read;
230       *size_read = size_to_read;
231       if (end_of_stream)
232         *end_of_stream = false;
233       break;
234     }
235 
236     if (!PopReadPacket()) {
237       *size_read = 0;
238       if (end_of_stream)
239         *end_of_stream = true;
240       break;
241     }
242   }
243   return true;
244 }
245 
IsWriteBufferFull() const246 bool FakeStream::IsWriteBufferFull() const {
247   return output_buffer_.size() >= max_output_buffer_size_;
248 }
249 
PopWritePacket()250 bool FakeStream::PopWritePacket() {
251   if (outgoing_queue_.empty())
252     return false;
253   const OutputDataPacket& packet = outgoing_queue_.front();
254   expected_output_data_ = std::move(packet.data);
255   delay_output_until_ = clock_->Now() + packet.delay_before;
256   max_output_buffer_size_ = packet.expected_size;
257   report_write_error_ = packet.write_error;
258   outgoing_queue_.pop();
259   return true;
260 }
261 
WriteNonBlocking(const void * buffer,size_t size_to_write,size_t * size_written,ErrorPtr * error)262 bool FakeStream::WriteNonBlocking(const void* buffer,
263                                   size_t size_to_write,
264                                   size_t* size_written,
265                                   ErrorPtr* error) {
266   if (!CanWrite())
267     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
268 
269   if (!IsOpen())
270     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
271 
272   for (;;) {
273     if (!delay_output_until_.is_null() && clock_->Now() < delay_output_until_) {
274       *size_written = 0;
275       return true;
276     }
277 
278     if (report_write_error_) {
279       report_write_error_ = false;
280       std::string message{expected_output_data_.begin(),
281                           expected_output_data_.end()};
282       if (message.empty())
283         message = "Simulating write error for tests";
284       output_buffer_.clear();
285       max_output_buffer_size_ = 0;
286       expected_output_data_.clear();
287       Error::AddTo(error, FROM_HERE, "fake_stream", "write_error", message);
288       return false;
289     }
290 
291     if (!IsWriteBufferFull()) {
292       bool success = true;
293       size_to_write = std::min(size_to_write,
294                                max_output_buffer_size_ - output_buffer_.size());
295       auto byte_ptr = static_cast<const uint8_t*>(buffer);
296       output_buffer_.insert(output_buffer_.end(),
297                             byte_ptr, byte_ptr + size_to_write);
298       if (output_buffer_.size()  == max_output_buffer_size_) {
299         if (!expected_output_data_.empty() &&
300             expected_output_data_ != output_buffer_) {
301           // We expected different data to be written, report an error.
302           Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
303                        "Unexpected data written");
304           success = false;
305         }
306 
307         all_output_data_.insert(all_output_data_.end(),
308                                 output_buffer_.begin(), output_buffer_.end());
309 
310         output_buffer_.clear();
311         max_output_buffer_size_ = 0;
312         expected_output_data_.clear();
313       }
314       *size_written = size_to_write;
315       return success;
316     }
317 
318     if (!PopWritePacket()) {
319       // No more data expected.
320       Error::AddTo(error, FROM_HERE, "fake_stream", "full",
321                    "No more output data expected");
322       return false;
323     }
324   }
325 }
326 
FlushBlocking(ErrorPtr * error)327 bool FakeStream::FlushBlocking(ErrorPtr* error) {
328   if (!CanWrite())
329     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
330 
331   if (!IsOpen())
332     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
333 
334   bool success = true;
335   if (!output_buffer_.empty()) {
336     if (!expected_output_data_.empty() &&
337         expected_output_data_ != output_buffer_) {
338       // We expected different data to be written, report an error.
339       Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
340                    "Unexpected data written");
341       success = false;
342     }
343     all_output_data_.insert(all_output_data_.end(),
344                             output_buffer_.begin(), output_buffer_.end());
345 
346     output_buffer_.clear();
347     max_output_buffer_size_ = 0;
348     expected_output_data_.clear();
349   }
350   return success;
351 }
352 
CloseBlocking(ErrorPtr *)353 bool FakeStream::CloseBlocking(ErrorPtr* /* error */) {
354   is_open_ = false;
355   return true;
356 }
357 
WaitForData(AccessMode mode,const base::Callback<void (AccessMode)> & callback,ErrorPtr * error)358 bool FakeStream::WaitForData(AccessMode mode,
359                              const base::Callback<void(AccessMode)>& callback,
360                              ErrorPtr* error) {
361   bool read_requested = stream_utils::IsReadAccessMode(mode);
362   bool write_requested = stream_utils::IsWriteAccessMode(mode);
363 
364   if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
365     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
366 
367   if (read_requested && IsReadBufferEmpty())
368     PopReadPacket();
369   if (write_requested && IsWriteBufferFull())
370     PopWritePacket();
371 
372   base::TimeDelta delay;
373   GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
374                      write_requested, delay_output_until_, &mode, &delay);
375   MessageLoop::current()->PostDelayedTask(
376       FROM_HERE, base::Bind(callback, mode), delay);
377   return true;
378 }
379 
WaitForDataBlocking(AccessMode in_mode,base::TimeDelta timeout,AccessMode * out_mode,ErrorPtr * error)380 bool FakeStream::WaitForDataBlocking(AccessMode in_mode,
381                                      base::TimeDelta timeout,
382                                      AccessMode* out_mode,
383                                      ErrorPtr* error) {
384   bool read_requested = stream_utils::IsReadAccessMode(in_mode);
385   bool write_requested = stream_utils::IsWriteAccessMode(in_mode);
386 
387   if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
388     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
389 
390   base::TimeDelta delay;
391   GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
392                      write_requested, delay_output_until_, out_mode, &delay);
393 
394   if (timeout < delay)
395     return stream_utils::ErrorOperationTimeout(FROM_HERE, error);
396 
397   LOG(INFO) << "TEST: Would have blocked for " << delay.InMilliseconds()
398             << " ms.";
399 
400   return true;
401 }
402 
403 }  // namespace brillo
404