• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "remoting/host/linux/audio_pipe_reader.h"
6 
7 #include <fcntl.h>
8 #include <sys/stat.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11 
12 #include "base/logging.h"
13 #include "base/posix/eintr_wrapper.h"
14 #include "base/stl_util.h"
15 
16 namespace remoting {
17 
18 namespace {
19 
20 const int kSampleBytesPerSecond = AudioPipeReader::kSamplingRate *
21                                   AudioPipeReader::kChannels *
22                                   AudioPipeReader::kBytesPerSample;
23 
24 // Read data from the pipe every 40ms.
25 const int kCapturingPeriodMs = 40;
26 
27 // Size of the pipe buffer in milliseconds.
28 const int kPipeBufferSizeMs = kCapturingPeriodMs * 2;
29 
30 // Size of the pipe buffer in bytes.
31 const int kPipeBufferSizeBytes = kPipeBufferSizeMs * kSampleBytesPerSecond /
32     base::Time::kMillisecondsPerSecond;
33 
34 #if !defined(F_SETPIPE_SZ)
35 // F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able
36 // to compile this code on machines with older kernel.
37 #define F_SETPIPE_SZ 1031
38 #endif  // defined(F_SETPIPE_SZ)
39 
40 }  // namespace
41 
42 // static
Create(scoped_refptr<base::SingleThreadTaskRunner> task_runner,const base::FilePath & pipe_path)43 scoped_refptr<AudioPipeReader> AudioPipeReader::Create(
44     scoped_refptr<base::SingleThreadTaskRunner> task_runner,
45     const base::FilePath& pipe_path) {
46   // Create a reference to the new AudioPipeReader before posting the
47   // StartOnAudioThread task, otherwise it may be deleted on the audio
48   // thread before we return.
49   scoped_refptr<AudioPipeReader> pipe_reader =
50       new AudioPipeReader(task_runner, pipe_path);
51   task_runner->PostTask(
52       FROM_HERE, base::Bind(&AudioPipeReader::StartOnAudioThread, pipe_reader));
53   return pipe_reader;
54 }
55 
AudioPipeReader(scoped_refptr<base::SingleThreadTaskRunner> task_runner,const base::FilePath & pipe_path)56 AudioPipeReader::AudioPipeReader(
57     scoped_refptr<base::SingleThreadTaskRunner> task_runner,
58     const base::FilePath& pipe_path)
59     : task_runner_(task_runner),
60       pipe_path_(pipe_path),
61       observers_(new ObserverListThreadSafe<StreamObserver>()) {
62 }
63 
~AudioPipeReader()64 AudioPipeReader::~AudioPipeReader() {}
65 
AddObserver(StreamObserver * observer)66 void AudioPipeReader::AddObserver(StreamObserver* observer) {
67   observers_->AddObserver(observer);
68 }
RemoveObserver(StreamObserver * observer)69 void AudioPipeReader::RemoveObserver(StreamObserver* observer) {
70   observers_->RemoveObserver(observer);
71 }
72 
OnFileCanReadWithoutBlocking(int fd)73 void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) {
74   DCHECK_EQ(fd, pipe_.GetPlatformFile());
75   StartTimer();
76 }
77 
OnFileCanWriteWithoutBlocking(int fd)78 void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) {
79   NOTREACHED();
80 }
81 
StartOnAudioThread()82 void AudioPipeReader::StartOnAudioThread() {
83   DCHECK(task_runner_->BelongsToCurrentThread());
84 
85   if (!file_watcher_.Watch(pipe_path_.DirName(), true,
86                            base::Bind(&AudioPipeReader::OnDirectoryChanged,
87                                       base::Unretained(this)))) {
88     LOG(ERROR) << "Failed to watch pulseaudio directory "
89                << pipe_path_.DirName().value();
90   }
91 
92   TryOpenPipe();
93 }
94 
OnDirectoryChanged(const base::FilePath & path,bool error)95 void AudioPipeReader::OnDirectoryChanged(const base::FilePath& path,
96                                          bool error) {
97   DCHECK(task_runner_->BelongsToCurrentThread());
98 
99   if (error) {
100     LOG(ERROR) << "File watcher returned an error.";
101     return;
102   }
103 
104   TryOpenPipe();
105 }
106 
TryOpenPipe()107 void AudioPipeReader::TryOpenPipe() {
108   DCHECK(task_runner_->BelongsToCurrentThread());
109 
110   base::File new_pipe;
111   new_pipe.Initialize(
112       pipe_path_,
113       base::File::FLAG_OPEN | base::File::FLAG_READ | base::File::FLAG_ASYNC);
114 
115   // If both |pipe_| and |new_pipe| are valid then compare inodes for the two
116   // file descriptors. Don't need to do anything if inode hasn't changed.
117   if (new_pipe.IsValid() && pipe_.IsValid()) {
118     struct stat old_stat;
119     struct stat new_stat;
120     if (fstat(pipe_.GetPlatformFile(), &old_stat) == 0 &&
121         fstat(new_pipe.GetPlatformFile(), &new_stat) == 0 &&
122         old_stat.st_ino == new_stat.st_ino) {
123       return;
124     }
125   }
126 
127   file_descriptor_watcher_.StopWatchingFileDescriptor();
128   timer_.Stop();
129 
130   pipe_ = new_pipe.Pass();
131 
132   if (pipe_.IsValid()) {
133     // Set O_NONBLOCK flag.
134     if (HANDLE_EINTR(fcntl(pipe_.GetPlatformFile(), F_SETFL, O_NONBLOCK)) < 0) {
135       PLOG(ERROR) << "fcntl";
136       pipe_.Close();
137       return;
138     }
139 
140     // Set buffer size for the pipe.
141     if (HANDLE_EINTR(fcntl(
142             pipe_.GetPlatformFile(), F_SETPIPE_SZ, kPipeBufferSizeBytes)) < 0) {
143       PLOG(ERROR) << "fcntl";
144     }
145 
146     WaitForPipeReadable();
147   }
148 }
149 
StartTimer()150 void AudioPipeReader::StartTimer() {
151   DCHECK(task_runner_->BelongsToCurrentThread());
152   started_time_ = base::TimeTicks::Now();
153   last_capture_position_ = 0;
154   timer_.Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs),
155                this, &AudioPipeReader::DoCapture);
156 }
157 
DoCapture()158 void AudioPipeReader::DoCapture() {
159   DCHECK(task_runner_->BelongsToCurrentThread());
160   DCHECK(pipe_.IsValid());
161 
162   // Calculate how much we need read from the pipe. Pulseaudio doesn't control
163   // how much data it writes to the pipe, so we need to pace the stream.
164   base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_;
165   int64 stream_position_bytes = stream_position.InMilliseconds() *
166       kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond;
167   int64 bytes_to_read = stream_position_bytes - last_capture_position_;
168 
169   std::string data = left_over_bytes_;
170   size_t pos = data.size();
171   left_over_bytes_.clear();
172   data.resize(pos + bytes_to_read);
173 
174   while (pos < data.size()) {
175     int read_result =
176         pipe_.ReadAtCurrentPos(string_as_array(&data) + pos, data.size() - pos);
177     if (read_result > 0) {
178       pos += read_result;
179     } else {
180       if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN)
181         PLOG(ERROR) << "read";
182       break;
183     }
184   }
185 
186   // Stop reading from the pipe if PulseAudio isn't writing anything.
187   if (pos == 0) {
188     WaitForPipeReadable();
189     return;
190   }
191 
192   // Save any incomplete samples we've read for later. Each packet should
193   // contain integer number of samples.
194   int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample);
195   left_over_bytes_.assign(data, pos - incomplete_samples_bytes,
196                           incomplete_samples_bytes);
197   data.resize(pos - incomplete_samples_bytes);
198 
199   last_capture_position_ += data.size();
200   // Normally PulseAudio will keep pipe buffer full, so we should always be able
201   // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make
202   // sure that |stream_position_bytes| doesn't go out of sync with the current
203   // stream position.
204   if (stream_position_bytes - last_capture_position_ > kPipeBufferSizeBytes)
205     last_capture_position_ = stream_position_bytes - kPipeBufferSizeBytes;
206   DCHECK_LE(last_capture_position_, stream_position_bytes);
207 
208   // Dispatch asynchronous notification to the stream observers.
209   scoped_refptr<base::RefCountedString> data_ref =
210       base::RefCountedString::TakeString(&data);
211   observers_->Notify(&StreamObserver::OnDataRead, data_ref);
212 }
213 
WaitForPipeReadable()214 void AudioPipeReader::WaitForPipeReadable() {
215   timer_.Stop();
216   base::MessageLoopForIO::current()->WatchFileDescriptor(
217       pipe_.GetPlatformFile(), false, base::MessageLoopForIO::WATCH_READ,
218       &file_descriptor_watcher_, this);
219 }
220 
221 // static
Destruct(const AudioPipeReader * audio_pipe_reader)222 void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) {
223   audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader);
224 }
225 
226 }  // namespace remoting
227