• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "remoting/host/linux/audio_pipe_reader.h"
6 
7 #include <fcntl.h>
8 #include <sys/stat.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11 
12 #include "base/files/file_path.h"
13 #include "base/logging.h"
14 #include "base/posix/eintr_wrapper.h"
15 #include "base/stl_util.h"
16 
17 namespace remoting {
18 
19 namespace {
20 
21 // PulseAudio's module-pipe-sink must be configured to use the following
22 // parameters for the sink we read from.
23 const int kSamplesPerSecond = 48000;
24 const int kChannels = 2;
25 const int kBytesPerSample = 2;
26 const int kSampleBytesPerSecond =
27     kSamplesPerSecond * kChannels * kBytesPerSample;
28 
29 // Read data from the pipe every 40ms.
30 const int kCapturingPeriodMs = 40;
31 
32 // Size of the pipe buffer in milliseconds.
33 const int kPipeBufferSizeMs = kCapturingPeriodMs * 2;
34 
35 // Size of the pipe buffer in bytes.
36 const int kPipeBufferSizeBytes = kPipeBufferSizeMs * kSampleBytesPerSecond /
37     base::Time::kMillisecondsPerSecond;
38 
39 #if !defined(F_SETPIPE_SZ)
40 // F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able
41 // to compile this code on machines with older kernel.
42 #define F_SETPIPE_SZ 1031
43 #endif  // defined(F_SETPIPE_SZ)
44 
45 }  // namespace
46 
47 // static
Create(scoped_refptr<base::SingleThreadTaskRunner> task_runner,const base::FilePath & pipe_name)48 scoped_refptr<AudioPipeReader> AudioPipeReader::Create(
49     scoped_refptr<base::SingleThreadTaskRunner> task_runner,
50     const base::FilePath& pipe_name) {
51   // Create a reference to the new AudioPipeReader before posting the
52   // StartOnAudioThread task, otherwise it may be deleted on the audio
53   // thread before we return.
54   scoped_refptr<AudioPipeReader> pipe_reader =
55       new AudioPipeReader(task_runner);
56   task_runner->PostTask(FROM_HERE, base::Bind(
57       &AudioPipeReader::StartOnAudioThread, pipe_reader, pipe_name));
58   return pipe_reader;
59 }
60 
StartOnAudioThread(const base::FilePath & pipe_name)61 void AudioPipeReader::StartOnAudioThread(const base::FilePath& pipe_name) {
62   DCHECK(task_runner_->BelongsToCurrentThread());
63 
64   pipe_fd_ = HANDLE_EINTR(open(
65       pipe_name.value().c_str(), O_RDONLY | O_NONBLOCK));
66   if (pipe_fd_ < 0) {
67     LOG(ERROR) << "Failed to open " << pipe_name.value();
68     return;
69   }
70 
71   // Set buffer size for the pipe.
72   int result = HANDLE_EINTR(
73       fcntl(pipe_fd_, F_SETPIPE_SZ, kPipeBufferSizeBytes));
74   if (result < 0) {
75     PLOG(ERROR) << "fcntl";
76   }
77 
78   WaitForPipeReadable();
79 }
80 
AudioPipeReader(scoped_refptr<base::SingleThreadTaskRunner> task_runner)81 AudioPipeReader::AudioPipeReader(
82     scoped_refptr<base::SingleThreadTaskRunner> task_runner)
83     : task_runner_(task_runner),
84       observers_(new ObserverListThreadSafe<StreamObserver>()) {
85 }
86 
~AudioPipeReader()87 AudioPipeReader::~AudioPipeReader() {
88 }
89 
AddObserver(StreamObserver * observer)90 void AudioPipeReader::AddObserver(StreamObserver* observer) {
91   observers_->AddObserver(observer);
92 }
RemoveObserver(StreamObserver * observer)93 void AudioPipeReader::RemoveObserver(StreamObserver* observer) {
94   observers_->RemoveObserver(observer);
95 }
96 
OnFileCanReadWithoutBlocking(int fd)97 void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) {
98   DCHECK_EQ(fd, pipe_fd_);
99   StartTimer();
100 }
101 
OnFileCanWriteWithoutBlocking(int fd)102 void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) {
103   NOTREACHED();
104 }
105 
StartTimer()106 void AudioPipeReader::StartTimer() {
107   DCHECK(task_runner_->BelongsToCurrentThread());
108   started_time_ = base::TimeTicks::Now();
109   last_capture_position_ = 0;
110   timer_.Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs),
111                this, &AudioPipeReader::DoCapture);
112 }
113 
DoCapture()114 void AudioPipeReader::DoCapture() {
115   DCHECK(task_runner_->BelongsToCurrentThread());
116   DCHECK_GT(pipe_fd_, 0);
117 
118   // Calculate how much we need read from the pipe. Pulseaudio doesn't control
119   // how much data it writes to the pipe, so we need to pace the stream, so
120   // that we read the exact number of the samples per second we need.
121   base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_;
122   int64 stream_position_bytes = stream_position.InMilliseconds() *
123       kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond;
124   int64 bytes_to_read = stream_position_bytes - last_capture_position_;
125 
126   std::string data = left_over_bytes_;
127   size_t pos = data.size();
128   left_over_bytes_.clear();
129   data.resize(pos + bytes_to_read);
130 
131   while (pos < data.size()) {
132     int read_result = HANDLE_EINTR(
133        read(pipe_fd_, string_as_array(&data) + pos, data.size() - pos));
134     if (read_result > 0) {
135       pos += read_result;
136     } else {
137       if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN)
138         PLOG(ERROR) << "read";
139       break;
140     }
141   }
142 
143   // Stop reading from the pipe if PulseAudio isn't writing anything.
144   if (pos == 0) {
145     WaitForPipeReadable();
146     return;
147   }
148 
149   // Save any incomplete samples we've read for later. Each packet should
150   // contain integer number of samples.
151   int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample);
152   left_over_bytes_.assign(data, pos - incomplete_samples_bytes,
153                           incomplete_samples_bytes);
154   data.resize(pos - incomplete_samples_bytes);
155 
156   last_capture_position_ += data.size();
157   // Normally PulseAudio will keep pipe buffer full, so we should always be able
158   // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make
159   // sure that |stream_position_bytes| doesn't go out of sync with the current
160   // stream position.
161   if (stream_position_bytes - last_capture_position_ > kPipeBufferSizeBytes)
162     last_capture_position_ = stream_position_bytes - kPipeBufferSizeBytes;
163   DCHECK_LE(last_capture_position_, stream_position_bytes);
164 
165   // Dispatch asynchronous notification to the stream observers.
166   scoped_refptr<base::RefCountedString> data_ref =
167       base::RefCountedString::TakeString(&data);
168   observers_->Notify(&StreamObserver::OnDataRead, data_ref);
169 }
170 
WaitForPipeReadable()171 void AudioPipeReader::WaitForPipeReadable() {
172   timer_.Stop();
173   base::MessageLoopForIO::current()->WatchFileDescriptor(
174       pipe_fd_,
175       false,
176       base::MessageLoopForIO::WATCH_READ,
177       &file_descriptor_watcher_,
178       this);
179 }
180 
181 // static
Destruct(const AudioPipeReader * audio_pipe_reader)182 void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) {
183   audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader);
184 }
185 
186 }  // namespace remoting
187