• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2020 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #define LOG_TAG "audio_proxy_client"
16 
17 #include "StreamOutImpl.h"
18 
19 #include <system/audio.h>
20 #include <time.h>
21 #include <utils/Log.h>
22 
23 #include <cstring>
24 
25 #include "AudioProxyStreamOut.h"
26 
27 using ::android::status_t;
28 
29 namespace audio_proxy {
30 namespace CPP_VERSION {
31 namespace {
32 // 1GB
33 constexpr uint32_t kMaxBufferSize = 1 << 30;
34 
deleteEventFlag(EventFlag * obj)35 void deleteEventFlag(EventFlag* obj) {
36   if (!obj) {
37     return;
38   }
39 
40   status_t status = EventFlag::deleteEventFlag(&obj);
41   ALOGE_IF(status, "write MQ event flag deletion error: %s", strerror(-status));
42 }
43 
44 class WriteThread : public Thread {
45  public:
46   // WriteThread's lifespan never exceeds StreamOut's lifespan.
47   WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
48               StreamOutImpl::CommandMQ* commandMQ,
49               StreamOutImpl::DataMQ* dataMQ, StreamOutImpl::StatusMQ* statusMQ,
50               EventFlag* eventFlag);
51 
52   ~WriteThread() override;
53 
54  private:
55   bool threadLoop() override;
56 
57   IStreamOut::WriteStatus doGetLatency();
58   IStreamOut::WriteStatus doGetPresentationPosition();
59   IStreamOut::WriteStatus doWrite();
60 
61   std::atomic<bool>* const mStop;
62   AudioProxyStreamOut* mStream;
63   StreamOutImpl::CommandMQ* const mCommandMQ;
64   StreamOutImpl::DataMQ* const mDataMQ;
65   StreamOutImpl::StatusMQ* const mStatusMQ;
66   EventFlag* const mEventFlag;
67   const std::unique_ptr<uint8_t[]> mBuffer;
68 };
69 
WriteThread(std::atomic<bool> * stop,AudioProxyStreamOut * stream,StreamOutImpl::CommandMQ * commandMQ,StreamOutImpl::DataMQ * dataMQ,StreamOutImpl::StatusMQ * statusMQ,EventFlag * eventFlag)70 WriteThread::WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
71                          StreamOutImpl::CommandMQ* commandMQ,
72                          StreamOutImpl::DataMQ* dataMQ,
73                          StreamOutImpl::StatusMQ* statusMQ,
74                          EventFlag* eventFlag)
75     : Thread(false /*canCallJava*/),
76       mStop(stop),
77       mStream(stream),
78       mCommandMQ(commandMQ),
79       mDataMQ(dataMQ),
80       mStatusMQ(statusMQ),
81       mEventFlag(eventFlag),
82       mBuffer(new uint8_t[mDataMQ->getQuantumCount()]) {}
83 
84 WriteThread::~WriteThread() = default;
85 
doWrite()86 IStreamOut::WriteStatus WriteThread::doWrite() {
87   const size_t availToRead = mDataMQ->availableToRead();
88   IStreamOut::WriteStatus status;
89   status.replyTo = IStreamOut::WriteCommand::WRITE;
90   status.retval = Result::OK;
91   status.reply.written = 0;
92   if (mDataMQ->read(&mBuffer[0], availToRead)) {
93     status.reply.written = availToRead;
94     ssize_t writeResult = mStream->write(&mBuffer[0], availToRead);
95     if (writeResult >= 0) {
96       status.reply.written = writeResult;
97       ALOGW_IF(writeResult < availToRead,
98                "Stream doesn't write all the bytes. Drop the unwritten bytes.");
99     } else {
100       status.retval = Result::INVALID_STATE;
101     }
102   }
103 
104   return status;
105 }
106 
doGetPresentationPosition()107 IStreamOut::WriteStatus WriteThread::doGetPresentationPosition() {
108   IStreamOut::WriteStatus status;
109   status.replyTo = IStreamOut::WriteCommand::GET_PRESENTATION_POSITION;
110   status.retval = mStream->getPresentationPosition(
111       &status.reply.presentationPosition.frames,
112       &status.reply.presentationPosition.timeStamp);
113   return status;
114 }
115 
doGetLatency()116 IStreamOut::WriteStatus WriteThread::doGetLatency() {
117   IStreamOut::WriteStatus status;
118   status.replyTo = IStreamOut::WriteCommand::GET_LATENCY;
119   status.retval = Result::OK;
120   status.reply.latencyMs = mStream->getLatency();
121   return status;
122 }
123 
threadLoop()124 bool WriteThread::threadLoop() {
125   // This implementation doesn't return control back to the Thread until the
126   // parent thread decides to stop, as the Thread uses mutexes, and this can
127   // lead to priority inversion.
128   while (!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
129     uint32_t efState = 0;
130     mEventFlag->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY),
131                      &efState);
132     if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) {
133       continue;  // Nothing to do.
134     }
135 
136     IStreamOut::WriteCommand replyTo;
137     if (!mCommandMQ->read(&replyTo)) {
138       continue;  // Nothing to do.
139     }
140 
141     IStreamOut::WriteStatus status;
142     switch (replyTo) {
143       case IStreamOut::WriteCommand::WRITE:
144         status = doWrite();
145         break;
146       case IStreamOut::WriteCommand::GET_PRESENTATION_POSITION:
147         status = doGetPresentationPosition();
148         break;
149       case IStreamOut::WriteCommand::GET_LATENCY:
150         status = doGetLatency();
151         break;
152       default:
153         ALOGE("Unknown write thread command code %d", replyTo);
154         status.retval = Result::NOT_SUPPORTED;
155         break;
156     }
157     if (!mStatusMQ->write(&status)) {
158       ALOGE("status message queue write failed");
159     }
160     mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
161   }
162 
163   return false;
164 }
165 
166 }  // namespace
167 
StreamOutImpl(std::unique_ptr<AudioProxyStreamOut> stream)168 StreamOutImpl::StreamOutImpl(std::unique_ptr<AudioProxyStreamOut> stream)
169     : mStream(std::move(stream)), mEventFlag(nullptr, deleteEventFlag) {}
170 
~StreamOutImpl()171 StreamOutImpl::~StreamOutImpl() {
172   closeImpl();
173 
174   if (mWriteThread) {
175     status_t status = mWriteThread->join();
176     ALOGE_IF(status, "write thread exit error: %s", strerror(-status));
177   }
178 
179   mEventFlag.reset();
180 }
181 
getFrameSize()182 Return<uint64_t> StreamOutImpl::getFrameSize() {
183   audio_format_t format = static_cast<audio_format_t>(mStream->getFormat());
184 
185   if (!audio_has_proportional_frames(format)) {
186     return sizeof(int8_t);
187   }
188 
189   size_t channel_sample_size = audio_bytes_per_sample(format);
190   return audio_channel_count_from_out_mask(
191              static_cast<audio_channel_mask_t>(mStream->getChannelMask())) *
192          channel_sample_size;
193 }
194 
getFrameCount()195 Return<uint64_t> StreamOutImpl::getFrameCount() {
196   return mStream->getFrameCount();
197 }
198 
getBufferSize()199 Return<uint64_t> StreamOutImpl::getBufferSize() {
200   return mStream->getBufferSize();
201 }
202 
getSampleRate()203 Return<uint32_t> StreamOutImpl::getSampleRate() {
204   return mStream->getSampleRate();
205 }
206 
getSupportedSampleRates(AudioFormat format,getSupportedSampleRates_cb _hidl_cb)207 Return<void> StreamOutImpl::getSupportedSampleRates(
208     AudioFormat format, getSupportedSampleRates_cb _hidl_cb) {
209   _hidl_cb(Result::OK, mStream->getSupportedSampleRates(format));
210   return Void();
211 }
212 
getSupportedChannelMasks(AudioFormat format,getSupportedChannelMasks_cb _hidl_cb)213 Return<void> StreamOutImpl::getSupportedChannelMasks(
214     AudioFormat format, getSupportedChannelMasks_cb _hidl_cb) {
215   _hidl_cb(Result::OK, mStream->getSupportedChannelMasks(format));
216   return Void();
217 }
218 
setSampleRate(uint32_t sampleRateHz)219 Return<Result> StreamOutImpl::setSampleRate(uint32_t sampleRateHz) {
220   return mStream->setSampleRate(sampleRateHz);
221 }
222 
getChannelMask()223 Return<hidl_bitfield<AudioChannelMask>> StreamOutImpl::getChannelMask() {
224   return hidl_bitfield<AudioChannelMask>(mStream->getChannelMask());
225 }
226 
setChannelMask(hidl_bitfield<AudioChannelMask> mask)227 Return<Result> StreamOutImpl::setChannelMask(
228     hidl_bitfield<AudioChannelMask> mask) {
229   return mStream->setChannelMask(mask);
230 }
231 
getFormat()232 Return<AudioFormat> StreamOutImpl::getFormat() { return mStream->getFormat(); }
233 
getSupportedFormats(getSupportedFormats_cb _hidl_cb)234 Return<void> StreamOutImpl::getSupportedFormats(
235     getSupportedFormats_cb _hidl_cb) {
236   _hidl_cb(mStream->getSupportedFormats());
237   return Void();
238 }
239 
setFormat(AudioFormat format)240 Return<Result> StreamOutImpl::setFormat(AudioFormat format) {
241   return mStream->setFormat(format);
242 }
243 
getAudioProperties(getAudioProperties_cb _hidl_cb)244 Return<void> StreamOutImpl::getAudioProperties(getAudioProperties_cb _hidl_cb) {
245   _hidl_cb(mStream->getSampleRate(), mStream->getChannelMask(),
246            mStream->getFormat());
247   return Void();
248 }
249 
addEffect(uint64_t effectId)250 Return<Result> StreamOutImpl::addEffect(uint64_t effectId) {
251   return Result::NOT_SUPPORTED;
252 }
253 
removeEffect(uint64_t effectId)254 Return<Result> StreamOutImpl::removeEffect(uint64_t effectId) {
255   return Result::NOT_SUPPORTED;
256 }
257 
standby()258 Return<Result> StreamOutImpl::standby() { return mStream->standby(); }
259 
getDevices(getDevices_cb _hidl_cb)260 Return<void> StreamOutImpl::getDevices(getDevices_cb _hidl_cb) {
261   _hidl_cb(Result::NOT_SUPPORTED, {});
262   return Void();
263 }
264 
setDevices(const hidl_vec<DeviceAddress> & devices)265 Return<Result> StreamOutImpl::setDevices(
266     const hidl_vec<DeviceAddress>& devices) {
267   return Result::NOT_SUPPORTED;
268 }
269 
getParameters(const hidl_vec<ParameterValue> & context,const hidl_vec<hidl_string> & keys,getParameters_cb _hidl_cb)270 Return<void> StreamOutImpl::getParameters(
271     const hidl_vec<ParameterValue>& context, const hidl_vec<hidl_string>& keys,
272     getParameters_cb _hidl_cb) {
273   _hidl_cb(Result::OK, mStream->getParameters(context, keys));
274   return Void();
275 }
276 
setParameters(const hidl_vec<ParameterValue> & context,const hidl_vec<ParameterValue> & parameters)277 Return<Result> StreamOutImpl::setParameters(
278     const hidl_vec<ParameterValue>& context,
279     const hidl_vec<ParameterValue>& parameters) {
280   return mStream->setParameters(context, parameters);
281 }
282 
setHwAvSync(uint32_t hwAvSync)283 Return<Result> StreamOutImpl::setHwAvSync(uint32_t hwAvSync) {
284   return Result::NOT_SUPPORTED;
285 }
286 
close()287 Return<Result> StreamOutImpl::close() { return closeImpl(); }
288 
closeImpl()289 Result StreamOutImpl::closeImpl() {
290   if (mStopWriteThread.load(
291           std::memory_order_relaxed)) {  // only this thread writes
292     return Result::INVALID_STATE;
293   }
294   mStopWriteThread.store(true, std::memory_order_release);
295   if (mEventFlag) {
296     mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
297   }
298   return Result::OK;
299 }
300 
getLatency()301 Return<uint32_t> StreamOutImpl::getLatency() { return mStream->getLatency(); }
302 
setVolume(float left,float right)303 Return<Result> StreamOutImpl::setVolume(float left, float right) {
304   return mStream->setVolume(left, right);
305 }
306 
prepareForWriting(uint32_t frameSize,uint32_t framesCount,prepareForWriting_cb _hidl_cb)307 Return<void> StreamOutImpl::prepareForWriting(uint32_t frameSize,
308                                               uint32_t framesCount,
309                                               prepareForWriting_cb _hidl_cb) {
310   ThreadInfo threadInfo = {0, 0};
311 
312   // Wrap the _hidl_cb to return an error
313   auto sendError = [&threadInfo, &_hidl_cb](Result result) -> Return<void> {
314     _hidl_cb(result, CommandMQ::Descriptor(), DataMQ::Descriptor(),
315              StatusMQ::Descriptor(), threadInfo);
316     return Void();
317   };
318 
319   if (mDataMQ) {
320     ALOGE("the client attempted to call prepareForWriting twice");
321     return sendError(Result::INVALID_STATE);
322   }
323 
324   if (frameSize == 0 || framesCount == 0) {
325     ALOGE("Invalid frameSize (%u) or framesCount (%u)", frameSize, framesCount);
326     return sendError(Result::INVALID_ARGUMENTS);
327   }
328 
329   if (frameSize > kMaxBufferSize / framesCount) {
330     ALOGE("Buffer too big: %u*%u bytes > MAX_BUFFER_SIZE (%u)", frameSize,
331           framesCount, kMaxBufferSize);
332     return sendError(Result::INVALID_ARGUMENTS);
333   }
334 
335   auto commandMQ = std::make_unique<CommandMQ>(1);
336   if (!commandMQ->isValid()) {
337     ALOGE("command MQ is invalid");
338     return sendError(Result::INVALID_ARGUMENTS);
339   }
340 
341   auto dataMQ =
342       std::make_unique<DataMQ>(frameSize * framesCount, true /* EventFlag */);
343   if (!dataMQ->isValid()) {
344     ALOGE("data MQ is invalid");
345     return sendError(Result::INVALID_ARGUMENTS);
346   }
347 
348   auto statusMQ = std::make_unique<StatusMQ>(1);
349   if (!statusMQ->isValid()) {
350     ALOGE("status MQ is invalid");
351     return sendError(Result::INVALID_ARGUMENTS);
352   }
353 
354   EventFlag* rawEventFlag = nullptr;
355   status_t status =
356       EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag);
357   std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag,
358                                                          deleteEventFlag);
359   if (status != ::android::OK || !eventFlag) {
360     ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
361     return sendError(Result::INVALID_ARGUMENTS);
362   }
363 
364   sp<WriteThread> writeThread =
365       new WriteThread(&mStopWriteThread, mStream.get(), commandMQ.get(),
366                       dataMQ.get(), statusMQ.get(), eventFlag.get());
367   status = writeThread->run("writer", ::android::PRIORITY_URGENT_AUDIO);
368   if (status != ::android::OK) {
369     ALOGW("failed to start writer thread: %s", strerror(-status));
370     return sendError(Result::INVALID_ARGUMENTS);
371   }
372 
373   mCommandMQ = std::move(commandMQ);
374   mDataMQ = std::move(dataMQ);
375   mStatusMQ = std::move(statusMQ);
376   mEventFlag = std::move(eventFlag);
377   mWriteThread = std::move(writeThread);
378   threadInfo.pid = getpid();
379   threadInfo.tid = mWriteThread->getTid();
380   _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(),
381            *mStatusMQ->getDesc(), threadInfo);
382   return Void();
383 }
384 
getRenderPosition(getRenderPosition_cb _hidl_cb)385 Return<void> StreamOutImpl::getRenderPosition(getRenderPosition_cb _hidl_cb) {
386   uint32_t dspFrames = 0;
387   Result res = mStream->getRenderPosition(&dspFrames);
388   _hidl_cb(res, dspFrames);
389   return Void();
390 }
391 
getNextWriteTimestamp(getNextWriteTimestamp_cb _hidl_cb)392 Return<void> StreamOutImpl::getNextWriteTimestamp(
393     getNextWriteTimestamp_cb _hidl_cb) {
394   int64_t timestamp = 0;
395   Result res = mStream->getNextWriteTimestamp(&timestamp);
396   _hidl_cb(res, timestamp);
397   return Void();
398 }
399 
setCallback(const sp<IStreamOutCallback> & callback)400 Return<Result> StreamOutImpl::setCallback(
401     const sp<IStreamOutCallback>& callback) {
402   return Result::NOT_SUPPORTED;
403 }
404 
clearCallback()405 Return<Result> StreamOutImpl::clearCallback() { return Result::NOT_SUPPORTED; }
406 
supportsPauseAndResume(supportsPauseAndResume_cb _hidl_cb)407 Return<void> StreamOutImpl::supportsPauseAndResume(
408     supportsPauseAndResume_cb _hidl_cb) {
409   _hidl_cb(true, true);
410   return Void();
411 }
412 
pause()413 Return<Result> StreamOutImpl::pause() { return mStream->pause(); }
414 
resume()415 Return<Result> StreamOutImpl::resume() { return mStream->resume(); }
416 
supportsDrain()417 Return<bool> StreamOutImpl::supportsDrain() { return mStream->supportsDrain(); }
418 
drain(AudioDrain type)419 Return<Result> StreamOutImpl::drain(AudioDrain type) {
420   return mStream->drain(type);
421 }
422 
flush()423 Return<Result> StreamOutImpl::flush() { return mStream->flush(); }
424 
getPresentationPosition(getPresentationPosition_cb _hidl_cb)425 Return<void> StreamOutImpl::getPresentationPosition(
426     getPresentationPosition_cb _hidl_cb) {
427   uint64_t frames = 0;
428   TimeSpec ts = {0, 0};
429   Result result = mStream->getPresentationPosition(&frames, &ts);
430   _hidl_cb(result, frames, ts);
431   return Void();
432 }
433 
start()434 Return<Result> StreamOutImpl::start() { return Result::NOT_SUPPORTED; }
435 
stop()436 Return<Result> StreamOutImpl::stop() { return Result::NOT_SUPPORTED; }
437 
createMmapBuffer(int32_t minSizeFrames,createMmapBuffer_cb _hidl_cb)438 Return<void> StreamOutImpl::createMmapBuffer(int32_t minSizeFrames,
439                                              createMmapBuffer_cb _hidl_cb) {
440   _hidl_cb(Result::NOT_SUPPORTED, MmapBufferInfo());
441   return Void();
442 }
443 
getMmapPosition(getMmapPosition_cb _hidl_cb)444 Return<void> StreamOutImpl::getMmapPosition(getMmapPosition_cb _hidl_cb) {
445   _hidl_cb(Result::NOT_SUPPORTED, MmapPosition());
446   return Void();
447 }
448 
updateSourceMetadata(const SourceMetadata & sourceMetadata)449 Return<void> StreamOutImpl::updateSourceMetadata(
450     const SourceMetadata& sourceMetadata) {
451   return Void();
452 }
453 
selectPresentation(int32_t presentationId,int32_t programId)454 Return<Result> StreamOutImpl::selectPresentation(int32_t presentationId,
455                                                  int32_t programId) {
456   return Result::NOT_SUPPORTED;
457 }
458 
459 }  // namespace CPP_VERSION
460 }  // namespace audio_proxy
461