1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "remoting/host/linux/audio_pipe_reader.h"
6
7 #include <fcntl.h>
8 #include <sys/stat.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11
12 #include "base/logging.h"
13 #include "base/posix/eintr_wrapper.h"
14 #include "base/stl_util.h"
15
16 namespace remoting {
17
18 namespace {
19
20 const int kSampleBytesPerSecond = AudioPipeReader::kSamplingRate *
21 AudioPipeReader::kChannels *
22 AudioPipeReader::kBytesPerSample;
23
24 // Read data from the pipe every 40ms.
25 const int kCapturingPeriodMs = 40;
26
27 // Size of the pipe buffer in milliseconds.
28 const int kPipeBufferSizeMs = kCapturingPeriodMs * 2;
29
30 // Size of the pipe buffer in bytes.
31 const int kPipeBufferSizeBytes = kPipeBufferSizeMs * kSampleBytesPerSecond /
32 base::Time::kMillisecondsPerSecond;
33
34 #if !defined(F_SETPIPE_SZ)
35 // F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able
36 // to compile this code on machines with older kernel.
37 #define F_SETPIPE_SZ 1031
38 #endif // defined(F_SETPIPE_SZ)
39
40 } // namespace
41
42 // static
Create(scoped_refptr<base::SingleThreadTaskRunner> task_runner,const base::FilePath & pipe_path)43 scoped_refptr<AudioPipeReader> AudioPipeReader::Create(
44 scoped_refptr<base::SingleThreadTaskRunner> task_runner,
45 const base::FilePath& pipe_path) {
46 // Create a reference to the new AudioPipeReader before posting the
47 // StartOnAudioThread task, otherwise it may be deleted on the audio
48 // thread before we return.
49 scoped_refptr<AudioPipeReader> pipe_reader =
50 new AudioPipeReader(task_runner, pipe_path);
51 task_runner->PostTask(
52 FROM_HERE, base::Bind(&AudioPipeReader::StartOnAudioThread, pipe_reader));
53 return pipe_reader;
54 }
55
AudioPipeReader(scoped_refptr<base::SingleThreadTaskRunner> task_runner,const base::FilePath & pipe_path)56 AudioPipeReader::AudioPipeReader(
57 scoped_refptr<base::SingleThreadTaskRunner> task_runner,
58 const base::FilePath& pipe_path)
59 : task_runner_(task_runner),
60 pipe_path_(pipe_path),
61 observers_(new ObserverListThreadSafe<StreamObserver>()) {
62 }
63
~AudioPipeReader()64 AudioPipeReader::~AudioPipeReader() {}
65
AddObserver(StreamObserver * observer)66 void AudioPipeReader::AddObserver(StreamObserver* observer) {
67 observers_->AddObserver(observer);
68 }
RemoveObserver(StreamObserver * observer)69 void AudioPipeReader::RemoveObserver(StreamObserver* observer) {
70 observers_->RemoveObserver(observer);
71 }
72
OnFileCanReadWithoutBlocking(int fd)73 void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) {
74 DCHECK_EQ(fd, pipe_.GetPlatformFile());
75 StartTimer();
76 }
77
OnFileCanWriteWithoutBlocking(int fd)78 void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) {
79 NOTREACHED();
80 }
81
StartOnAudioThread()82 void AudioPipeReader::StartOnAudioThread() {
83 DCHECK(task_runner_->BelongsToCurrentThread());
84
85 if (!file_watcher_.Watch(pipe_path_.DirName(), true,
86 base::Bind(&AudioPipeReader::OnDirectoryChanged,
87 base::Unretained(this)))) {
88 LOG(ERROR) << "Failed to watch pulseaudio directory "
89 << pipe_path_.DirName().value();
90 }
91
92 TryOpenPipe();
93 }
94
OnDirectoryChanged(const base::FilePath & path,bool error)95 void AudioPipeReader::OnDirectoryChanged(const base::FilePath& path,
96 bool error) {
97 DCHECK(task_runner_->BelongsToCurrentThread());
98
99 if (error) {
100 LOG(ERROR) << "File watcher returned an error.";
101 return;
102 }
103
104 TryOpenPipe();
105 }
106
TryOpenPipe()107 void AudioPipeReader::TryOpenPipe() {
108 DCHECK(task_runner_->BelongsToCurrentThread());
109
110 base::File new_pipe;
111 new_pipe.Initialize(
112 pipe_path_,
113 base::File::FLAG_OPEN | base::File::FLAG_READ | base::File::FLAG_ASYNC);
114
115 // If both |pipe_| and |new_pipe| are valid then compare inodes for the two
116 // file descriptors. Don't need to do anything if inode hasn't changed.
117 if (new_pipe.IsValid() && pipe_.IsValid()) {
118 struct stat old_stat;
119 struct stat new_stat;
120 if (fstat(pipe_.GetPlatformFile(), &old_stat) == 0 &&
121 fstat(new_pipe.GetPlatformFile(), &new_stat) == 0 &&
122 old_stat.st_ino == new_stat.st_ino) {
123 return;
124 }
125 }
126
127 file_descriptor_watcher_.StopWatchingFileDescriptor();
128 timer_.Stop();
129
130 pipe_ = new_pipe.Pass();
131
132 if (pipe_.IsValid()) {
133 // Set O_NONBLOCK flag.
134 if (HANDLE_EINTR(fcntl(pipe_.GetPlatformFile(), F_SETFL, O_NONBLOCK)) < 0) {
135 PLOG(ERROR) << "fcntl";
136 pipe_.Close();
137 return;
138 }
139
140 // Set buffer size for the pipe.
141 if (HANDLE_EINTR(fcntl(
142 pipe_.GetPlatformFile(), F_SETPIPE_SZ, kPipeBufferSizeBytes)) < 0) {
143 PLOG(ERROR) << "fcntl";
144 }
145
146 WaitForPipeReadable();
147 }
148 }
149
StartTimer()150 void AudioPipeReader::StartTimer() {
151 DCHECK(task_runner_->BelongsToCurrentThread());
152 started_time_ = base::TimeTicks::Now();
153 last_capture_position_ = 0;
154 timer_.Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs),
155 this, &AudioPipeReader::DoCapture);
156 }
157
DoCapture()158 void AudioPipeReader::DoCapture() {
159 DCHECK(task_runner_->BelongsToCurrentThread());
160 DCHECK(pipe_.IsValid());
161
162 // Calculate how much we need read from the pipe. Pulseaudio doesn't control
163 // how much data it writes to the pipe, so we need to pace the stream.
164 base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_;
165 int64 stream_position_bytes = stream_position.InMilliseconds() *
166 kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond;
167 int64 bytes_to_read = stream_position_bytes - last_capture_position_;
168
169 std::string data = left_over_bytes_;
170 size_t pos = data.size();
171 left_over_bytes_.clear();
172 data.resize(pos + bytes_to_read);
173
174 while (pos < data.size()) {
175 int read_result =
176 pipe_.ReadAtCurrentPos(string_as_array(&data) + pos, data.size() - pos);
177 if (read_result > 0) {
178 pos += read_result;
179 } else {
180 if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN)
181 PLOG(ERROR) << "read";
182 break;
183 }
184 }
185
186 // Stop reading from the pipe if PulseAudio isn't writing anything.
187 if (pos == 0) {
188 WaitForPipeReadable();
189 return;
190 }
191
192 // Save any incomplete samples we've read for later. Each packet should
193 // contain integer number of samples.
194 int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample);
195 left_over_bytes_.assign(data, pos - incomplete_samples_bytes,
196 incomplete_samples_bytes);
197 data.resize(pos - incomplete_samples_bytes);
198
199 last_capture_position_ += data.size();
200 // Normally PulseAudio will keep pipe buffer full, so we should always be able
201 // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make
202 // sure that |stream_position_bytes| doesn't go out of sync with the current
203 // stream position.
204 if (stream_position_bytes - last_capture_position_ > kPipeBufferSizeBytes)
205 last_capture_position_ = stream_position_bytes - kPipeBufferSizeBytes;
206 DCHECK_LE(last_capture_position_, stream_position_bytes);
207
208 // Dispatch asynchronous notification to the stream observers.
209 scoped_refptr<base::RefCountedString> data_ref =
210 base::RefCountedString::TakeString(&data);
211 observers_->Notify(&StreamObserver::OnDataRead, data_ref);
212 }
213
WaitForPipeReadable()214 void AudioPipeReader::WaitForPipeReadable() {
215 timer_.Stop();
216 base::MessageLoopForIO::current()->WatchFileDescriptor(
217 pipe_.GetPlatformFile(), false, base::MessageLoopForIO::WATCH_READ,
218 &file_descriptor_watcher_, this);
219 }
220
221 // static
Destruct(const AudioPipeReader * audio_pipe_reader)222 void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) {
223 audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader);
224 }
225
226 } // namespace remoting
227