• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2020 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #define LOG_TAG "audio_proxy_client"
16 
17 #include "StreamOutImpl.h"
18 
19 #include <system/audio.h>
20 #include <time.h>
21 #include <utils/Log.h>
22 
23 #include <cstring>
24 
25 #include "AudioProxyStreamOut.h"
26 
27 using ::android::status_t;
28 
29 namespace audio_proxy {
30 namespace AUDIO_PROXY_CPP_VERSION {
31 namespace {
32 // 1GB
33 constexpr uint32_t kMaxBufferSize = 1 << 30;
34 
deleteEventFlag(EventFlag * obj)35 void deleteEventFlag(EventFlag* obj) {
36   if (!obj) {
37     return;
38   }
39 
40   status_t status = EventFlag::deleteEventFlag(&obj);
41   ALOGE_IF(status, "write MQ event flag deletion error: %s", strerror(-status));
42 }
43 
44 class WriteThread : public Thread {
45  public:
46   // WriteThread's lifespan never exceeds StreamOut's lifespan.
47   WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
48               StreamOutImpl::CommandMQ* commandMQ,
49               StreamOutImpl::DataMQ* dataMQ, StreamOutImpl::StatusMQ* statusMQ,
50               EventFlag* eventFlag);
51 
52   ~WriteThread() override;
53 
54  private:
55   bool threadLoop() override;
56 
57   IStreamOut::WriteStatus doGetLatency();
58   IStreamOut::WriteStatus doGetPresentationPosition();
59   IStreamOut::WriteStatus doWrite();
60 
61   std::atomic<bool>* const mStop;
62   AudioProxyStreamOut* mStream;
63   StreamOutImpl::CommandMQ* const mCommandMQ;
64   StreamOutImpl::DataMQ* const mDataMQ;
65   StreamOutImpl::StatusMQ* const mStatusMQ;
66   EventFlag* const mEventFlag;
67   const std::unique_ptr<uint8_t[]> mBuffer;
68 };
69 
WriteThread(std::atomic<bool> * stop,AudioProxyStreamOut * stream,StreamOutImpl::CommandMQ * commandMQ,StreamOutImpl::DataMQ * dataMQ,StreamOutImpl::StatusMQ * statusMQ,EventFlag * eventFlag)70 WriteThread::WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
71                          StreamOutImpl::CommandMQ* commandMQ,
72                          StreamOutImpl::DataMQ* dataMQ,
73                          StreamOutImpl::StatusMQ* statusMQ,
74                          EventFlag* eventFlag)
75     : Thread(false /*canCallJava*/),
76       mStop(stop),
77       mStream(stream),
78       mCommandMQ(commandMQ),
79       mDataMQ(dataMQ),
80       mStatusMQ(statusMQ),
81       mEventFlag(eventFlag),
82       mBuffer(new uint8_t[mDataMQ->getQuantumCount()]) {}
83 
84 WriteThread::~WriteThread() = default;
85 
doWrite()86 IStreamOut::WriteStatus WriteThread::doWrite() {
87   const size_t availToRead = mDataMQ->availableToRead();
88   IStreamOut::WriteStatus status;
89   status.replyTo = IStreamOut::WriteCommand::WRITE;
90   status.retval = Result::OK;
91   status.reply.written = 0;
92   if (mDataMQ->read(&mBuffer[0], availToRead)) {
93     status.reply.written = availToRead;
94     ssize_t writeResult = mStream->write(&mBuffer[0], availToRead);
95     if (writeResult >= 0) {
96       status.reply.written = writeResult;
97       ALOGW_IF(writeResult < availToRead,
98                "Stream doesn't write all the bytes. Drop the unwritten bytes.");
99     } else {
100       status.retval = Result::INVALID_STATE;
101     }
102   }
103 
104   return status;
105 }
106 
doGetPresentationPosition()107 IStreamOut::WriteStatus WriteThread::doGetPresentationPosition() {
108   IStreamOut::WriteStatus status;
109   status.replyTo = IStreamOut::WriteCommand::GET_PRESENTATION_POSITION;
110   status.retval = mStream->getPresentationPosition(
111       &status.reply.presentationPosition.frames,
112       &status.reply.presentationPosition.timeStamp);
113   return status;
114 }
115 
doGetLatency()116 IStreamOut::WriteStatus WriteThread::doGetLatency() {
117   IStreamOut::WriteStatus status;
118   status.replyTo = IStreamOut::WriteCommand::GET_LATENCY;
119   status.retval = Result::OK;
120   status.reply.latencyMs = mStream->getLatency();
121   return status;
122 }
123 
threadLoop()124 bool WriteThread::threadLoop() {
125   // This implementation doesn't return control back to the Thread until the
126   // parent thread decides to stop, as the Thread uses mutexes, and this can
127   // lead to priority inversion.
128   while (!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
129     uint32_t efState = 0;
130     mEventFlag->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY),
131                      &efState);
132     if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) {
133       continue;  // Nothing to do.
134     }
135 
136     IStreamOut::WriteCommand replyTo;
137     if (!mCommandMQ->read(&replyTo)) {
138       continue;  // Nothing to do.
139     }
140 
141     IStreamOut::WriteStatus status;
142     switch (replyTo) {
143       case IStreamOut::WriteCommand::WRITE:
144         status = doWrite();
145         break;
146       case IStreamOut::WriteCommand::GET_PRESENTATION_POSITION:
147         status = doGetPresentationPosition();
148         break;
149       case IStreamOut::WriteCommand::GET_LATENCY:
150         status = doGetLatency();
151         break;
152       default:
153         ALOGE("Unknown write thread command code %d", replyTo);
154         status.retval = Result::NOT_SUPPORTED;
155         break;
156     }
157     if (!mStatusMQ->write(&status)) {
158       ALOGE("status message queue write failed");
159     }
160     mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
161   }
162 
163   return false;
164 }
165 
166 }  // namespace
167 
StreamOutImpl(std::unique_ptr<AudioProxyStreamOut> stream)168 StreamOutImpl::StreamOutImpl(std::unique_ptr<AudioProxyStreamOut> stream)
169     : mStream(std::move(stream)), mEventFlag(nullptr, deleteEventFlag) {}
170 
~StreamOutImpl()171 StreamOutImpl::~StreamOutImpl() {
172   closeImpl();
173 
174   if (mWriteThread) {
175     status_t status = mWriteThread->join();
176     ALOGE_IF(status, "write thread exit error: %s", strerror(-status));
177   }
178 
179   mEventFlag.reset();
180 }
181 
setEventListener(const sp<IStreamEventListener> & listener)182 Return<void> StreamOutImpl::setEventListener(
183     const sp<IStreamEventListener>& listener) {
184   mEventListener = listener;
185   return Void();
186 }
187 
getFrameSize()188 Return<uint64_t> StreamOutImpl::getFrameSize() {
189   audio_format_t format = static_cast<audio_format_t>(mStream->getFormat());
190 
191   if (!audio_has_proportional_frames(format)) {
192     return sizeof(int8_t);
193   }
194 
195   size_t channel_sample_size = audio_bytes_per_sample(format);
196   return audio_channel_count_from_out_mask(
197              static_cast<audio_channel_mask_t>(mStream->getChannelMask())) *
198          channel_sample_size;
199 }
200 
getFrameCount()201 Return<uint64_t> StreamOutImpl::getFrameCount() {
202   return mStream->getFrameCount();
203 }
204 
getBufferSize()205 Return<uint64_t> StreamOutImpl::getBufferSize() {
206   return mStream->getBufferSize();
207 }
208 
getSampleRate()209 Return<uint32_t> StreamOutImpl::getSampleRate() {
210   return mStream->getSampleRate();
211 }
212 
getSupportedSampleRates(AudioFormat format,getSupportedSampleRates_cb _hidl_cb)213 Return<void> StreamOutImpl::getSupportedSampleRates(
214     AudioFormat format, getSupportedSampleRates_cb _hidl_cb) {
215   _hidl_cb(Result::OK, mStream->getSupportedSampleRates(format));
216   return Void();
217 }
218 
getSupportedChannelMasks(AudioFormat format,getSupportedChannelMasks_cb _hidl_cb)219 Return<void> StreamOutImpl::getSupportedChannelMasks(
220     AudioFormat format, getSupportedChannelMasks_cb _hidl_cb) {
221   _hidl_cb(Result::OK, mStream->getSupportedChannelMasks(format));
222   return Void();
223 }
224 
setSampleRate(uint32_t sampleRateHz)225 Return<Result> StreamOutImpl::setSampleRate(uint32_t sampleRateHz) {
226   return mStream->setSampleRate(sampleRateHz);
227 }
228 
getChannelMask()229 Return<hidl_bitfield<AudioChannelMask>> StreamOutImpl::getChannelMask() {
230   return hidl_bitfield<AudioChannelMask>(mStream->getChannelMask());
231 }
232 
setChannelMask(hidl_bitfield<AudioChannelMask> mask)233 Return<Result> StreamOutImpl::setChannelMask(
234     hidl_bitfield<AudioChannelMask> mask) {
235   return mStream->setChannelMask(mask);
236 }
237 
getFormat()238 Return<AudioFormat> StreamOutImpl::getFormat() { return mStream->getFormat(); }
239 
getSupportedFormats(getSupportedFormats_cb _hidl_cb)240 Return<void> StreamOutImpl::getSupportedFormats(
241     getSupportedFormats_cb _hidl_cb) {
242   _hidl_cb(mStream->getSupportedFormats());
243   return Void();
244 }
245 
setFormat(AudioFormat format)246 Return<Result> StreamOutImpl::setFormat(AudioFormat format) {
247   return mStream->setFormat(format);
248 }
249 
getAudioProperties(getAudioProperties_cb _hidl_cb)250 Return<void> StreamOutImpl::getAudioProperties(getAudioProperties_cb _hidl_cb) {
251   _hidl_cb(mStream->getSampleRate(), mStream->getChannelMask(),
252            mStream->getFormat());
253   return Void();
254 }
255 
addEffect(uint64_t effectId)256 Return<Result> StreamOutImpl::addEffect(uint64_t effectId) {
257   return Result::NOT_SUPPORTED;
258 }
259 
removeEffect(uint64_t effectId)260 Return<Result> StreamOutImpl::removeEffect(uint64_t effectId) {
261   return Result::NOT_SUPPORTED;
262 }
263 
standby()264 Return<Result> StreamOutImpl::standby() { return mStream->standby(); }
265 
getDevices(getDevices_cb _hidl_cb)266 Return<void> StreamOutImpl::getDevices(getDevices_cb _hidl_cb) {
267   _hidl_cb(Result::NOT_SUPPORTED, {});
268   return Void();
269 }
270 
setDevices(const hidl_vec<DeviceAddress> & devices)271 Return<Result> StreamOutImpl::setDevices(
272     const hidl_vec<DeviceAddress>& devices) {
273   return Result::NOT_SUPPORTED;
274 }
275 
getParameters(const hidl_vec<ParameterValue> & context,const hidl_vec<hidl_string> & keys,getParameters_cb _hidl_cb)276 Return<void> StreamOutImpl::getParameters(
277     const hidl_vec<ParameterValue>& context, const hidl_vec<hidl_string>& keys,
278     getParameters_cb _hidl_cb) {
279   _hidl_cb(Result::OK, mStream->getParameters(context, keys));
280   return Void();
281 }
282 
setParameters(const hidl_vec<ParameterValue> & context,const hidl_vec<ParameterValue> & parameters)283 Return<Result> StreamOutImpl::setParameters(
284     const hidl_vec<ParameterValue>& context,
285     const hidl_vec<ParameterValue>& parameters) {
286   return mStream->setParameters(context, parameters);
287 }
288 
setHwAvSync(uint32_t hwAvSync)289 Return<Result> StreamOutImpl::setHwAvSync(uint32_t hwAvSync) {
290   return Result::NOT_SUPPORTED;
291 }
292 
close()293 Return<Result> StreamOutImpl::close() { return closeImpl(); }
294 
closeImpl()295 Result StreamOutImpl::closeImpl() {
296   if (mStopWriteThread.load(
297           std::memory_order_relaxed)) {  // only this thread writes
298     return Result::INVALID_STATE;
299   }
300   mStopWriteThread.store(true, std::memory_order_release);
301   if (mEventFlag) {
302     mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
303   }
304 
305   if (mEventListener) {
306     mEventListener->onClose();
307   }
308 
309   return Result::OK;
310 }
311 
getLatency()312 Return<uint32_t> StreamOutImpl::getLatency() { return mStream->getLatency(); }
313 
setVolume(float left,float right)314 Return<Result> StreamOutImpl::setVolume(float left, float right) {
315   return mStream->setVolume(left, right);
316 }
317 
prepareForWriting(uint32_t frameSize,uint32_t framesCount,prepareForWriting_cb _hidl_cb)318 Return<void> StreamOutImpl::prepareForWriting(uint32_t frameSize,
319                                               uint32_t framesCount,
320                                               prepareForWriting_cb _hidl_cb) {
321   ThreadInfo threadInfo = {0, 0};
322 
323   // Wrap the _hidl_cb to return an error
324   auto sendError = [&threadInfo, &_hidl_cb](Result result) -> Return<void> {
325     _hidl_cb(result, CommandMQ::Descriptor(), DataMQ::Descriptor(),
326              StatusMQ::Descriptor(), threadInfo);
327     return Void();
328   };
329 
330   if (mDataMQ) {
331     ALOGE("the client attempted to call prepareForWriting twice");
332     return sendError(Result::INVALID_STATE);
333   }
334 
335   if (frameSize == 0 || framesCount == 0) {
336     ALOGE("Invalid frameSize (%u) or framesCount (%u)", frameSize, framesCount);
337     return sendError(Result::INVALID_ARGUMENTS);
338   }
339 
340   if (frameSize > kMaxBufferSize / framesCount) {
341     ALOGE("Buffer too big: %u*%u bytes > MAX_BUFFER_SIZE (%u)", frameSize,
342           framesCount, kMaxBufferSize);
343     return sendError(Result::INVALID_ARGUMENTS);
344   }
345 
346   auto commandMQ = std::make_unique<CommandMQ>(1);
347   if (!commandMQ->isValid()) {
348     ALOGE("command MQ is invalid");
349     return sendError(Result::INVALID_ARGUMENTS);
350   }
351 
352   auto dataMQ =
353       std::make_unique<DataMQ>(frameSize * framesCount, true /* EventFlag */);
354   if (!dataMQ->isValid()) {
355     ALOGE("data MQ is invalid");
356     return sendError(Result::INVALID_ARGUMENTS);
357   }
358 
359   auto statusMQ = std::make_unique<StatusMQ>(1);
360   if (!statusMQ->isValid()) {
361     ALOGE("status MQ is invalid");
362     return sendError(Result::INVALID_ARGUMENTS);
363   }
364 
365   EventFlag* rawEventFlag = nullptr;
366   status_t status =
367       EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag);
368   std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag,
369                                                          deleteEventFlag);
370   if (status != ::android::OK || !eventFlag) {
371     ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
372     return sendError(Result::INVALID_ARGUMENTS);
373   }
374 
375   sp<WriteThread> writeThread =
376       new WriteThread(&mStopWriteThread, mStream.get(), commandMQ.get(),
377                       dataMQ.get(), statusMQ.get(), eventFlag.get());
378   status = writeThread->run("writer", ::android::PRIORITY_URGENT_AUDIO);
379   if (status != ::android::OK) {
380     ALOGW("failed to start writer thread: %s", strerror(-status));
381     return sendError(Result::INVALID_ARGUMENTS);
382   }
383 
384   mCommandMQ = std::move(commandMQ);
385   mDataMQ = std::move(dataMQ);
386   mStatusMQ = std::move(statusMQ);
387   mEventFlag = std::move(eventFlag);
388   mWriteThread = std::move(writeThread);
389   threadInfo.pid = getpid();
390   threadInfo.tid = mWriteThread->getTid();
391   _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(),
392            *mStatusMQ->getDesc(), threadInfo);
393   return Void();
394 }
395 
getRenderPosition(getRenderPosition_cb _hidl_cb)396 Return<void> StreamOutImpl::getRenderPosition(getRenderPosition_cb _hidl_cb) {
397   uint32_t dspFrames = 0;
398   Result res = mStream->getRenderPosition(&dspFrames);
399   _hidl_cb(res, dspFrames);
400   return Void();
401 }
402 
getNextWriteTimestamp(getNextWriteTimestamp_cb _hidl_cb)403 Return<void> StreamOutImpl::getNextWriteTimestamp(
404     getNextWriteTimestamp_cb _hidl_cb) {
405   int64_t timestamp = 0;
406   Result res = mStream->getNextWriteTimestamp(&timestamp);
407   _hidl_cb(res, timestamp);
408   return Void();
409 }
410 
setCallback(const sp<IStreamOutCallback> & callback)411 Return<Result> StreamOutImpl::setCallback(
412     const sp<IStreamOutCallback>& callback) {
413   return Result::NOT_SUPPORTED;
414 }
415 
clearCallback()416 Return<Result> StreamOutImpl::clearCallback() { return Result::NOT_SUPPORTED; }
417 
supportsPauseAndResume(supportsPauseAndResume_cb _hidl_cb)418 Return<void> StreamOutImpl::supportsPauseAndResume(
419     supportsPauseAndResume_cb _hidl_cb) {
420   _hidl_cb(true, true);
421   return Void();
422 }
423 
pause()424 Return<Result> StreamOutImpl::pause() { return mStream->pause(); }
425 
resume()426 Return<Result> StreamOutImpl::resume() { return mStream->resume(); }
427 
supportsDrain()428 Return<bool> StreamOutImpl::supportsDrain() { return mStream->supportsDrain(); }
429 
drain(AudioDrain type)430 Return<Result> StreamOutImpl::drain(AudioDrain type) {
431   return mStream->drain(type);
432 }
433 
flush()434 Return<Result> StreamOutImpl::flush() { return mStream->flush(); }
435 
getPresentationPosition(getPresentationPosition_cb _hidl_cb)436 Return<void> StreamOutImpl::getPresentationPosition(
437     getPresentationPosition_cb _hidl_cb) {
438   uint64_t frames = 0;
439   TimeSpec ts = {0, 0};
440   Result result = mStream->getPresentationPosition(&frames, &ts);
441   _hidl_cb(result, frames, ts);
442   return Void();
443 }
444 
start()445 Return<Result> StreamOutImpl::start() { return Result::NOT_SUPPORTED; }
446 
stop()447 Return<Result> StreamOutImpl::stop() { return Result::NOT_SUPPORTED; }
448 
createMmapBuffer(int32_t minSizeFrames,createMmapBuffer_cb _hidl_cb)449 Return<void> StreamOutImpl::createMmapBuffer(int32_t minSizeFrames,
450                                              createMmapBuffer_cb _hidl_cb) {
451   _hidl_cb(Result::NOT_SUPPORTED, MmapBufferInfo());
452   return Void();
453 }
454 
getMmapPosition(getMmapPosition_cb _hidl_cb)455 Return<void> StreamOutImpl::getMmapPosition(getMmapPosition_cb _hidl_cb) {
456   _hidl_cb(Result::NOT_SUPPORTED, MmapPosition());
457   return Void();
458 }
459 
updateSourceMetadata(const SourceMetadata & sourceMetadata)460 Return<void> StreamOutImpl::updateSourceMetadata(
461     const SourceMetadata& sourceMetadata) {
462   return Void();
463 }
464 
selectPresentation(int32_t presentationId,int32_t programId)465 Return<Result> StreamOutImpl::selectPresentation(int32_t presentationId,
466                                                  int32_t programId) {
467   return Result::NOT_SUPPORTED;
468 }
469 
470 }  // namespace AUDIO_PROXY_CPP_VERSION
471 }  // namespace audio_proxy
472