1 // Copyright (C) 2020 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #define LOG_TAG "audio_proxy_client"
16
17 #include "StreamOutImpl.h"
18
19 #include <system/audio.h>
20 #include <time.h>
21 #include <utils/Log.h>
22
23 #include <cstring>
24
25 #include "AudioProxyStreamOut.h"
26
27 using ::android::status_t;
28
29 namespace audio_proxy {
30 namespace CPP_VERSION {
31 namespace {
32 // 1GB
33 constexpr uint32_t kMaxBufferSize = 1 << 30;
34
deleteEventFlag(EventFlag * obj)35 void deleteEventFlag(EventFlag* obj) {
36 if (!obj) {
37 return;
38 }
39
40 status_t status = EventFlag::deleteEventFlag(&obj);
41 ALOGE_IF(status, "write MQ event flag deletion error: %s", strerror(-status));
42 }
43
44 class WriteThread : public Thread {
45 public:
46 // WriteThread's lifespan never exceeds StreamOut's lifespan.
47 WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
48 StreamOutImpl::CommandMQ* commandMQ,
49 StreamOutImpl::DataMQ* dataMQ, StreamOutImpl::StatusMQ* statusMQ,
50 EventFlag* eventFlag);
51
52 ~WriteThread() override;
53
54 private:
55 bool threadLoop() override;
56
57 IStreamOut::WriteStatus doGetLatency();
58 IStreamOut::WriteStatus doGetPresentationPosition();
59 IStreamOut::WriteStatus doWrite();
60
61 std::atomic<bool>* const mStop;
62 AudioProxyStreamOut* mStream;
63 StreamOutImpl::CommandMQ* const mCommandMQ;
64 StreamOutImpl::DataMQ* const mDataMQ;
65 StreamOutImpl::StatusMQ* const mStatusMQ;
66 EventFlag* const mEventFlag;
67 const std::unique_ptr<uint8_t[]> mBuffer;
68 };
69
WriteThread(std::atomic<bool> * stop,AudioProxyStreamOut * stream,StreamOutImpl::CommandMQ * commandMQ,StreamOutImpl::DataMQ * dataMQ,StreamOutImpl::StatusMQ * statusMQ,EventFlag * eventFlag)70 WriteThread::WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
71 StreamOutImpl::CommandMQ* commandMQ,
72 StreamOutImpl::DataMQ* dataMQ,
73 StreamOutImpl::StatusMQ* statusMQ,
74 EventFlag* eventFlag)
75 : Thread(false /*canCallJava*/),
76 mStop(stop),
77 mStream(stream),
78 mCommandMQ(commandMQ),
79 mDataMQ(dataMQ),
80 mStatusMQ(statusMQ),
81 mEventFlag(eventFlag),
82 mBuffer(new uint8_t[mDataMQ->getQuantumCount()]) {}
83
84 WriteThread::~WriteThread() = default;
85
doWrite()86 IStreamOut::WriteStatus WriteThread::doWrite() {
87 const size_t availToRead = mDataMQ->availableToRead();
88 IStreamOut::WriteStatus status;
89 status.replyTo = IStreamOut::WriteCommand::WRITE;
90 status.retval = Result::OK;
91 status.reply.written = 0;
92 if (mDataMQ->read(&mBuffer[0], availToRead)) {
93 status.reply.written = availToRead;
94 ssize_t writeResult = mStream->write(&mBuffer[0], availToRead);
95 if (writeResult >= 0) {
96 status.reply.written = writeResult;
97 ALOGW_IF(writeResult < availToRead,
98 "Stream doesn't write all the bytes. Drop the unwritten bytes.");
99 } else {
100 status.retval = Result::INVALID_STATE;
101 }
102 }
103
104 return status;
105 }
106
doGetPresentationPosition()107 IStreamOut::WriteStatus WriteThread::doGetPresentationPosition() {
108 IStreamOut::WriteStatus status;
109 status.replyTo = IStreamOut::WriteCommand::GET_PRESENTATION_POSITION;
110 status.retval = mStream->getPresentationPosition(
111 &status.reply.presentationPosition.frames,
112 &status.reply.presentationPosition.timeStamp);
113 return status;
114 }
115
doGetLatency()116 IStreamOut::WriteStatus WriteThread::doGetLatency() {
117 IStreamOut::WriteStatus status;
118 status.replyTo = IStreamOut::WriteCommand::GET_LATENCY;
119 status.retval = Result::OK;
120 status.reply.latencyMs = mStream->getLatency();
121 return status;
122 }
123
threadLoop()124 bool WriteThread::threadLoop() {
125 // This implementation doesn't return control back to the Thread until the
126 // parent thread decides to stop, as the Thread uses mutexes, and this can
127 // lead to priority inversion.
128 while (!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
129 uint32_t efState = 0;
130 mEventFlag->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY),
131 &efState);
132 if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) {
133 continue; // Nothing to do.
134 }
135
136 IStreamOut::WriteCommand replyTo;
137 if (!mCommandMQ->read(&replyTo)) {
138 continue; // Nothing to do.
139 }
140
141 IStreamOut::WriteStatus status;
142 switch (replyTo) {
143 case IStreamOut::WriteCommand::WRITE:
144 status = doWrite();
145 break;
146 case IStreamOut::WriteCommand::GET_PRESENTATION_POSITION:
147 status = doGetPresentationPosition();
148 break;
149 case IStreamOut::WriteCommand::GET_LATENCY:
150 status = doGetLatency();
151 break;
152 default:
153 ALOGE("Unknown write thread command code %d", replyTo);
154 status.retval = Result::NOT_SUPPORTED;
155 break;
156 }
157 if (!mStatusMQ->write(&status)) {
158 ALOGE("status message queue write failed");
159 }
160 mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
161 }
162
163 return false;
164 }
165
166 } // namespace
167
StreamOutImpl(std::unique_ptr<AudioProxyStreamOut> stream)168 StreamOutImpl::StreamOutImpl(std::unique_ptr<AudioProxyStreamOut> stream)
169 : mStream(std::move(stream)), mEventFlag(nullptr, deleteEventFlag) {}
170
~StreamOutImpl()171 StreamOutImpl::~StreamOutImpl() {
172 closeImpl();
173
174 if (mWriteThread) {
175 status_t status = mWriteThread->join();
176 ALOGE_IF(status, "write thread exit error: %s", strerror(-status));
177 }
178
179 mEventFlag.reset();
180 }
181
getFrameSize()182 Return<uint64_t> StreamOutImpl::getFrameSize() {
183 audio_format_t format = static_cast<audio_format_t>(mStream->getFormat());
184
185 if (!audio_has_proportional_frames(format)) {
186 return sizeof(int8_t);
187 }
188
189 size_t channel_sample_size = audio_bytes_per_sample(format);
190 return audio_channel_count_from_out_mask(
191 static_cast<audio_channel_mask_t>(mStream->getChannelMask())) *
192 channel_sample_size;
193 }
194
getFrameCount()195 Return<uint64_t> StreamOutImpl::getFrameCount() {
196 return mStream->getFrameCount();
197 }
198
getBufferSize()199 Return<uint64_t> StreamOutImpl::getBufferSize() {
200 return mStream->getBufferSize();
201 }
202
getSampleRate()203 Return<uint32_t> StreamOutImpl::getSampleRate() {
204 return mStream->getSampleRate();
205 }
206
getSupportedSampleRates(AudioFormat format,getSupportedSampleRates_cb _hidl_cb)207 Return<void> StreamOutImpl::getSupportedSampleRates(
208 AudioFormat format, getSupportedSampleRates_cb _hidl_cb) {
209 _hidl_cb(Result::OK, mStream->getSupportedSampleRates(format));
210 return Void();
211 }
212
getSupportedChannelMasks(AudioFormat format,getSupportedChannelMasks_cb _hidl_cb)213 Return<void> StreamOutImpl::getSupportedChannelMasks(
214 AudioFormat format, getSupportedChannelMasks_cb _hidl_cb) {
215 _hidl_cb(Result::OK, mStream->getSupportedChannelMasks(format));
216 return Void();
217 }
218
setSampleRate(uint32_t sampleRateHz)219 Return<Result> StreamOutImpl::setSampleRate(uint32_t sampleRateHz) {
220 return mStream->setSampleRate(sampleRateHz);
221 }
222
getChannelMask()223 Return<hidl_bitfield<AudioChannelMask>> StreamOutImpl::getChannelMask() {
224 return hidl_bitfield<AudioChannelMask>(mStream->getChannelMask());
225 }
226
setChannelMask(hidl_bitfield<AudioChannelMask> mask)227 Return<Result> StreamOutImpl::setChannelMask(
228 hidl_bitfield<AudioChannelMask> mask) {
229 return mStream->setChannelMask(mask);
230 }
231
getFormat()232 Return<AudioFormat> StreamOutImpl::getFormat() { return mStream->getFormat(); }
233
getSupportedFormats(getSupportedFormats_cb _hidl_cb)234 Return<void> StreamOutImpl::getSupportedFormats(
235 getSupportedFormats_cb _hidl_cb) {
236 _hidl_cb(mStream->getSupportedFormats());
237 return Void();
238 }
239
setFormat(AudioFormat format)240 Return<Result> StreamOutImpl::setFormat(AudioFormat format) {
241 return mStream->setFormat(format);
242 }
243
getAudioProperties(getAudioProperties_cb _hidl_cb)244 Return<void> StreamOutImpl::getAudioProperties(getAudioProperties_cb _hidl_cb) {
245 _hidl_cb(mStream->getSampleRate(), mStream->getChannelMask(),
246 mStream->getFormat());
247 return Void();
248 }
249
addEffect(uint64_t effectId)250 Return<Result> StreamOutImpl::addEffect(uint64_t effectId) {
251 return Result::NOT_SUPPORTED;
252 }
253
removeEffect(uint64_t effectId)254 Return<Result> StreamOutImpl::removeEffect(uint64_t effectId) {
255 return Result::NOT_SUPPORTED;
256 }
257
standby()258 Return<Result> StreamOutImpl::standby() { return mStream->standby(); }
259
getDevices(getDevices_cb _hidl_cb)260 Return<void> StreamOutImpl::getDevices(getDevices_cb _hidl_cb) {
261 _hidl_cb(Result::NOT_SUPPORTED, {});
262 return Void();
263 }
264
setDevices(const hidl_vec<DeviceAddress> & devices)265 Return<Result> StreamOutImpl::setDevices(
266 const hidl_vec<DeviceAddress>& devices) {
267 return Result::NOT_SUPPORTED;
268 }
269
getParameters(const hidl_vec<ParameterValue> & context,const hidl_vec<hidl_string> & keys,getParameters_cb _hidl_cb)270 Return<void> StreamOutImpl::getParameters(
271 const hidl_vec<ParameterValue>& context, const hidl_vec<hidl_string>& keys,
272 getParameters_cb _hidl_cb) {
273 _hidl_cb(Result::OK, mStream->getParameters(context, keys));
274 return Void();
275 }
276
setParameters(const hidl_vec<ParameterValue> & context,const hidl_vec<ParameterValue> & parameters)277 Return<Result> StreamOutImpl::setParameters(
278 const hidl_vec<ParameterValue>& context,
279 const hidl_vec<ParameterValue>& parameters) {
280 return mStream->setParameters(context, parameters);
281 }
282
setHwAvSync(uint32_t hwAvSync)283 Return<Result> StreamOutImpl::setHwAvSync(uint32_t hwAvSync) {
284 return Result::NOT_SUPPORTED;
285 }
286
close()287 Return<Result> StreamOutImpl::close() { return closeImpl(); }
288
closeImpl()289 Result StreamOutImpl::closeImpl() {
290 if (mStopWriteThread.load(
291 std::memory_order_relaxed)) { // only this thread writes
292 return Result::INVALID_STATE;
293 }
294 mStopWriteThread.store(true, std::memory_order_release);
295 if (mEventFlag) {
296 mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
297 }
298 return Result::OK;
299 }
300
getLatency()301 Return<uint32_t> StreamOutImpl::getLatency() { return mStream->getLatency(); }
302
setVolume(float left,float right)303 Return<Result> StreamOutImpl::setVolume(float left, float right) {
304 return mStream->setVolume(left, right);
305 }
306
prepareForWriting(uint32_t frameSize,uint32_t framesCount,prepareForWriting_cb _hidl_cb)307 Return<void> StreamOutImpl::prepareForWriting(uint32_t frameSize,
308 uint32_t framesCount,
309 prepareForWriting_cb _hidl_cb) {
310 ThreadInfo threadInfo = {0, 0};
311
312 // Wrap the _hidl_cb to return an error
313 auto sendError = [&threadInfo, &_hidl_cb](Result result) -> Return<void> {
314 _hidl_cb(result, CommandMQ::Descriptor(), DataMQ::Descriptor(),
315 StatusMQ::Descriptor(), threadInfo);
316 return Void();
317 };
318
319 if (mDataMQ) {
320 ALOGE("the client attempted to call prepareForWriting twice");
321 return sendError(Result::INVALID_STATE);
322 }
323
324 if (frameSize == 0 || framesCount == 0) {
325 ALOGE("Invalid frameSize (%u) or framesCount (%u)", frameSize, framesCount);
326 return sendError(Result::INVALID_ARGUMENTS);
327 }
328
329 if (frameSize > kMaxBufferSize / framesCount) {
330 ALOGE("Buffer too big: %u*%u bytes > MAX_BUFFER_SIZE (%u)", frameSize,
331 framesCount, kMaxBufferSize);
332 return sendError(Result::INVALID_ARGUMENTS);
333 }
334
335 auto commandMQ = std::make_unique<CommandMQ>(1);
336 if (!commandMQ->isValid()) {
337 ALOGE("command MQ is invalid");
338 return sendError(Result::INVALID_ARGUMENTS);
339 }
340
341 auto dataMQ =
342 std::make_unique<DataMQ>(frameSize * framesCount, true /* EventFlag */);
343 if (!dataMQ->isValid()) {
344 ALOGE("data MQ is invalid");
345 return sendError(Result::INVALID_ARGUMENTS);
346 }
347
348 auto statusMQ = std::make_unique<StatusMQ>(1);
349 if (!statusMQ->isValid()) {
350 ALOGE("status MQ is invalid");
351 return sendError(Result::INVALID_ARGUMENTS);
352 }
353
354 EventFlag* rawEventFlag = nullptr;
355 status_t status =
356 EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag);
357 std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag,
358 deleteEventFlag);
359 if (status != ::android::OK || !eventFlag) {
360 ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
361 return sendError(Result::INVALID_ARGUMENTS);
362 }
363
364 sp<WriteThread> writeThread =
365 new WriteThread(&mStopWriteThread, mStream.get(), commandMQ.get(),
366 dataMQ.get(), statusMQ.get(), eventFlag.get());
367 status = writeThread->run("writer", ::android::PRIORITY_URGENT_AUDIO);
368 if (status != ::android::OK) {
369 ALOGW("failed to start writer thread: %s", strerror(-status));
370 return sendError(Result::INVALID_ARGUMENTS);
371 }
372
373 mCommandMQ = std::move(commandMQ);
374 mDataMQ = std::move(dataMQ);
375 mStatusMQ = std::move(statusMQ);
376 mEventFlag = std::move(eventFlag);
377 mWriteThread = std::move(writeThread);
378 threadInfo.pid = getpid();
379 threadInfo.tid = mWriteThread->getTid();
380 _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(),
381 *mStatusMQ->getDesc(), threadInfo);
382 return Void();
383 }
384
getRenderPosition(getRenderPosition_cb _hidl_cb)385 Return<void> StreamOutImpl::getRenderPosition(getRenderPosition_cb _hidl_cb) {
386 uint32_t dspFrames = 0;
387 Result res = mStream->getRenderPosition(&dspFrames);
388 _hidl_cb(res, dspFrames);
389 return Void();
390 }
391
getNextWriteTimestamp(getNextWriteTimestamp_cb _hidl_cb)392 Return<void> StreamOutImpl::getNextWriteTimestamp(
393 getNextWriteTimestamp_cb _hidl_cb) {
394 int64_t timestamp = 0;
395 Result res = mStream->getNextWriteTimestamp(×tamp);
396 _hidl_cb(res, timestamp);
397 return Void();
398 }
399
setCallback(const sp<IStreamOutCallback> & callback)400 Return<Result> StreamOutImpl::setCallback(
401 const sp<IStreamOutCallback>& callback) {
402 return Result::NOT_SUPPORTED;
403 }
404
clearCallback()405 Return<Result> StreamOutImpl::clearCallback() { return Result::NOT_SUPPORTED; }
406
supportsPauseAndResume(supportsPauseAndResume_cb _hidl_cb)407 Return<void> StreamOutImpl::supportsPauseAndResume(
408 supportsPauseAndResume_cb _hidl_cb) {
409 _hidl_cb(true, true);
410 return Void();
411 }
412
pause()413 Return<Result> StreamOutImpl::pause() { return mStream->pause(); }
414
resume()415 Return<Result> StreamOutImpl::resume() { return mStream->resume(); }
416
supportsDrain()417 Return<bool> StreamOutImpl::supportsDrain() { return mStream->supportsDrain(); }
418
drain(AudioDrain type)419 Return<Result> StreamOutImpl::drain(AudioDrain type) {
420 return mStream->drain(type);
421 }
422
flush()423 Return<Result> StreamOutImpl::flush() { return mStream->flush(); }
424
getPresentationPosition(getPresentationPosition_cb _hidl_cb)425 Return<void> StreamOutImpl::getPresentationPosition(
426 getPresentationPosition_cb _hidl_cb) {
427 uint64_t frames = 0;
428 TimeSpec ts = {0, 0};
429 Result result = mStream->getPresentationPosition(&frames, &ts);
430 _hidl_cb(result, frames, ts);
431 return Void();
432 }
433
start()434 Return<Result> StreamOutImpl::start() { return Result::NOT_SUPPORTED; }
435
stop()436 Return<Result> StreamOutImpl::stop() { return Result::NOT_SUPPORTED; }
437
createMmapBuffer(int32_t minSizeFrames,createMmapBuffer_cb _hidl_cb)438 Return<void> StreamOutImpl::createMmapBuffer(int32_t minSizeFrames,
439 createMmapBuffer_cb _hidl_cb) {
440 _hidl_cb(Result::NOT_SUPPORTED, MmapBufferInfo());
441 return Void();
442 }
443
getMmapPosition(getMmapPosition_cb _hidl_cb)444 Return<void> StreamOutImpl::getMmapPosition(getMmapPosition_cb _hidl_cb) {
445 _hidl_cb(Result::NOT_SUPPORTED, MmapPosition());
446 return Void();
447 }
448
updateSourceMetadata(const SourceMetadata & sourceMetadata)449 Return<void> StreamOutImpl::updateSourceMetadata(
450 const SourceMetadata& sourceMetadata) {
451 return Void();
452 }
453
selectPresentation(int32_t presentationId,int32_t programId)454 Return<Result> StreamOutImpl::selectPresentation(int32_t presentationId,
455 int32_t programId) {
456 return Result::NOT_SUPPORTED;
457 }
458
459 } // namespace CPP_VERSION
460 } // namespace audio_proxy
461