• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "OutputStreamImpl.h"
16 
17 #include <aidl/device/google/atv/audio_proxy/MessageQueueFlag.h>
18 #include <aidl/device/google/atv/audio_proxy/PresentationPosition.h>
19 #include <android-base/logging.h>
20 #include <time.h>
21 
22 #include "AudioProxyClientError.h"
23 #include "AudioProxyStreamOut.h"
24 
25 using aidl::device::google::atv::audio_proxy::MessageQueueFlag;
26 using aidl::device::google::atv::audio_proxy::PresentationPosition;
27 using android::status_t;
28 
29 namespace audio_proxy {
30 namespace {
31 // 1GB
32 constexpr uint32_t kMaxBufferSize = 1 << 30;
33 
deleteEventFlag(EventFlag * obj)34 void deleteEventFlag(EventFlag* obj) {
35   if (!obj) {
36     return;
37   }
38 
39   status_t status = EventFlag::deleteEventFlag(&obj);
40   if (status) {
41     LOG(ERROR) << "write MQ event flag deletion error: " << strerror(-status);
42   }
43 }
44 
45 class WriteThread : public Thread {
46  public:
47   // WriteThread's lifespan never exceeds StreamOut's lifespan.
48   WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
49               OutputStreamImpl::DataMQ* dataMQ,
50               OutputStreamImpl::StatusMQ* statusMQ, EventFlag* eventFlag);
51 
52   ~WriteThread() override;
53 
54  private:
55   bool threadLoop() override;
56 
57   PresentationPosition doGetPresentationPosition();
58   int64_t doWrite();
59 
60   std::atomic<bool>* const mStop;
61   AudioProxyStreamOut* mStream;
62   OutputStreamImpl::DataMQ* const mDataMQ;
63   OutputStreamImpl::StatusMQ* const mStatusMQ;
64   EventFlag* const mEventFlag;
65   const std::unique_ptr<int8_t[]> mBuffer;
66 };
67 
WriteThread(std::atomic<bool> * stop,AudioProxyStreamOut * stream,OutputStreamImpl::DataMQ * dataMQ,OutputStreamImpl::StatusMQ * statusMQ,EventFlag * eventFlag)68 WriteThread::WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
69                          OutputStreamImpl::DataMQ* dataMQ,
70                          OutputStreamImpl::StatusMQ* statusMQ,
71                          EventFlag* eventFlag)
72     : Thread(false /*canCallJava*/),
73       mStop(stop),
74       mStream(stream),
75       mDataMQ(dataMQ),
76       mStatusMQ(statusMQ),
77       mEventFlag(eventFlag),
78       mBuffer(new int8_t[mDataMQ->getQuantumCount()]) {}
79 
80 WriteThread::~WriteThread() = default;
81 
doWrite()82 int64_t WriteThread::doWrite() {
83   const size_t availToRead = mDataMQ->availableToRead();
84   if (availToRead == 0) {
85     return 0;
86   }
87 
88   if (!mDataMQ->read(&mBuffer[0], availToRead)) {
89     return 0;
90   }
91 
92   return mStream->write(&mBuffer[0], availToRead);
93 }
94 
doGetPresentationPosition()95 PresentationPosition WriteThread::doGetPresentationPosition() {
96   PresentationPosition position;
97   mStream->getPresentationPosition(&position.frames, &position.timestamp);
98   return position;
99 }
100 
threadLoop()101 bool WriteThread::threadLoop() {
102   // This implementation doesn't return control back to the Thread until the
103   // parent thread decides to stop, as the Thread uses mutexes, and this can
104   // lead to priority inversion.
105   while (!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
106     uint32_t efState = 0;
107     mEventFlag->wait(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY),
108                      &efState);
109     if (!(efState & static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY))) {
110       continue;  // Nothing to do.
111     }
112 
113     WriteStatus status;
114     status.written = doWrite();
115     status.position = doGetPresentationPosition();
116 
117     if (!mStatusMQ->write(&status)) {
118       LOG(ERROR) << "status message queue write failed.";
119     }
120     mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_FULL));
121   }
122 
123   return false;
124 }
125 
126 }  // namespace
127 
OutputStreamImpl(std::unique_ptr<AudioProxyStreamOut> stream)128 OutputStreamImpl::OutputStreamImpl(std::unique_ptr<AudioProxyStreamOut> stream)
129     : mStream(std::move(stream)), mEventFlag(nullptr, deleteEventFlag) {}
130 
~OutputStreamImpl()131 OutputStreamImpl::~OutputStreamImpl() {
132   closeImpl();
133 
134   if (mWriteThread) {
135     status_t status = mWriteThread->join();
136     if (status) {
137       LOG(ERROR) << "write thread exit error: " << strerror(-status);
138     }
139   }
140 
141   mEventFlag.reset();
142 }
143 
standby()144 ndk::ScopedAStatus OutputStreamImpl::standby() {
145   mStream->standby();
146   return ndk::ScopedAStatus::ok();
147 }
148 
close()149 ndk::ScopedAStatus OutputStreamImpl::close() { return closeImpl(); }
150 
closeImpl()151 ndk::ScopedAStatus OutputStreamImpl::closeImpl() {
152   if (mStopWriteThread.load(
153           std::memory_order_relaxed)) {  // only this thread writes
154     return ndk::ScopedAStatus::ok();
155   }
156   mStopWriteThread.store(true, std::memory_order_release);
157   if (mEventFlag) {
158     mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY));
159   }
160 
161   return ndk::ScopedAStatus::ok();
162 }
163 
prepareForWriting(int32_t frameSize,int32_t framesCount,DataMQDesc * dataMQDesc,StatusMQDesc * statusMQDesc)164 ndk::ScopedAStatus OutputStreamImpl::prepareForWriting(
165     int32_t frameSize, int32_t framesCount, DataMQDesc* dataMQDesc,
166     StatusMQDesc* statusMQDesc) {
167   if (mDataMQ) {
168     LOG(ERROR) << "the client attempted to call prepareForWriting twice.";
169     return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
170   }
171 
172   if (frameSize == 0 || framesCount == 0) {
173     LOG(ERROR) << "Invalid frameSize (" << frameSize << ") or framesCount ("
174                << framesCount << ")";
175     return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
176   }
177 
178   if (frameSize > kMaxBufferSize / framesCount) {
179     LOG(ERROR) << "Buffer too big: " << frameSize << "*" << framesCount
180                << " bytes > MAX_BUFFER_SIZE (" << kMaxBufferSize << ")";
181     return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
182   }
183 
184   auto dataMQ =
185       std::make_unique<DataMQ>(frameSize * framesCount, true /* EventFlag */);
186   if (!dataMQ->isValid()) {
187     LOG(ERROR) << "data MQ is invalid";
188     return ndk::ScopedAStatus::fromServiceSpecificError(
189         ERROR_FMQ_CREATION_FAILURE);
190   }
191 
192   auto statusMQ = std::make_unique<StatusMQ>(1);
193   if (!statusMQ->isValid()) {
194     LOG(ERROR) << "status MQ is invalid";
195     return ndk::ScopedAStatus::fromServiceSpecificError(
196         ERROR_FMQ_CREATION_FAILURE);
197   }
198 
199   EventFlag* rawEventFlag = nullptr;
200   status_t status =
201       EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag);
202   std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag,
203                                                          deleteEventFlag);
204   if (status != ::android::OK || !eventFlag) {
205     LOG(ERROR) << "failed creating event flag for data MQ: "
206                << strerror(-status);
207     return ndk::ScopedAStatus::fromServiceSpecificError(
208         ERROR_FMQ_CREATION_FAILURE);
209   }
210 
211   sp<WriteThread> writeThread =
212       new WriteThread(&mStopWriteThread, mStream.get(), dataMQ.get(),
213                       statusMQ.get(), eventFlag.get());
214   status = writeThread->run("writer", ::android::PRIORITY_URGENT_AUDIO);
215   if (status != ::android::OK) {
216     LOG(ERROR) << "failed to start writer thread: " << strerror(-status);
217     return ndk::ScopedAStatus::fromServiceSpecificError(
218         ERROR_FMQ_CREATION_FAILURE);
219   }
220 
221   mDataMQ = std::move(dataMQ);
222   mStatusMQ = std::move(statusMQ);
223   mEventFlag = std::move(eventFlag);
224   mWriteThread = std::move(writeThread);
225 
226   *dataMQDesc = mDataMQ->dupeDesc();
227   *statusMQDesc = mStatusMQ->dupeDesc();
228 
229   return ndk::ScopedAStatus::ok();
230 }
231 
pause()232 ndk::ScopedAStatus OutputStreamImpl::pause() {
233   mStream->pause();
234   return ndk::ScopedAStatus::ok();
235 }
236 
resume()237 ndk::ScopedAStatus OutputStreamImpl::resume() {
238   mStream->resume();
239   return ndk::ScopedAStatus::ok();
240 }
241 
drain(AudioDrain type)242 ndk::ScopedAStatus OutputStreamImpl::drain(AudioDrain type) {
243   mStream->drain(type);
244   return ndk::ScopedAStatus::ok();
245 }
246 
flush()247 ndk::ScopedAStatus OutputStreamImpl::flush() {
248   mStream->flush();
249   return ndk::ScopedAStatus::ok();
250 }
251 
setVolume(float left,float right)252 ndk::ScopedAStatus OutputStreamImpl::setVolume(float left, float right) {
253   mStream->setVolume(left, right);
254   return ndk::ScopedAStatus::ok();
255 }
256 
257 }  // namespace audio_proxy