1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chromeos/dbus/pipe_reader.h"
6
7 #include "base/bind.h"
8 #include "base/posix/eintr_wrapper.h"
9 #include "base/task_runner.h"
10 #include "net/base/file_stream.h"
11 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h"
13
14 namespace chromeos {
15
PipeReader(const scoped_refptr<base::TaskRunner> & task_runner,const IOCompleteCallback & callback)16 PipeReader::PipeReader(const scoped_refptr<base::TaskRunner>& task_runner,
17 const IOCompleteCallback& callback)
18 : io_buffer_(new net::IOBufferWithSize(4096)),
19 task_runner_(task_runner),
20 callback_(callback),
21 weak_ptr_factory_(this) {}
22
~PipeReader()23 PipeReader::~PipeReader() {
24 }
25
StartIO()26 base::File PipeReader::StartIO() {
27 // Use a pipe to collect data
28 int pipe_fds[2];
29 const int status = HANDLE_EINTR(pipe(pipe_fds));
30 if (status < 0) {
31 PLOG(ERROR) << "pipe";
32 return base::File();
33 }
34 base::File pipe_write_end(pipe_fds[1]);
35 // Pass ownership of pipe_fds[0] to data_stream_, which will close it.
36 data_stream_.reset(new net::FileStream(
37 base::File(pipe_fds[0]), task_runner_));
38
39 // Post an initial async read to setup data collection
40 int rv = data_stream_->Read(
41 io_buffer_.get(), io_buffer_->size(),
42 base::Bind(&PipeReader::OnDataReady, weak_ptr_factory_.GetWeakPtr()));
43 if (rv != net::ERR_IO_PENDING) {
44 LOG(ERROR) << "Unable to post initial read";
45 return base::File();
46 }
47 return pipe_write_end.Pass();
48 }
49
OnDataReady(int byte_count)50 void PipeReader::OnDataReady(int byte_count) {
51 DVLOG(1) << "OnDataReady byte_count " << byte_count;
52 if (byte_count <= 0) {
53 callback_.Run(); // signal creator to take data and delete us
54 return;
55 }
56
57 AcceptData(io_buffer_->data(), byte_count);
58
59 // Post another read
60 int rv = data_stream_->Read(
61 io_buffer_.get(), io_buffer_->size(),
62 base::Bind(&PipeReader::OnDataReady, weak_ptr_factory_.GetWeakPtr()));
63 if (rv != net::ERR_IO_PENDING) {
64 LOG(ERROR) << "Unable to post another read";
65 // TODO(sleffler) do something more intelligent?
66 }
67 }
68
PipeReaderForString(const scoped_refptr<base::TaskRunner> & task_runner,const IOCompleteCallback & callback)69 PipeReaderForString::PipeReaderForString(
70 const scoped_refptr<base::TaskRunner>& task_runner,
71 const IOCompleteCallback& callback)
72 : PipeReader(task_runner, callback) {
73 }
74
AcceptData(const char * data,int byte_count)75 void PipeReaderForString::AcceptData(const char *data, int byte_count) {
76 data_.append(data, byte_count);
77 }
78
GetData(std::string * data)79 void PipeReaderForString::GetData(std::string* data) {
80 data_.swap(*data);
81 }
82
83 } // namespace chromeos
84