1 // Copyright (C) 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "OutputStreamImpl.h"
16
17 #include <aidl/device/google/atv/audio_proxy/MessageQueueFlag.h>
18 #include <aidl/device/google/atv/audio_proxy/PresentationPosition.h>
19 #include <android-base/logging.h>
20 #include <time.h>
21
22 #include "AudioProxyClientError.h"
23 #include "AudioProxyStreamOut.h"
24
25 using aidl::device::google::atv::audio_proxy::MessageQueueFlag;
26 using aidl::device::google::atv::audio_proxy::PresentationPosition;
27 using android::status_t;
28
29 namespace audio_proxy {
30 namespace {
31 // 1GB
32 constexpr uint32_t kMaxBufferSize = 1 << 30;
33
deleteEventFlag(EventFlag * obj)34 void deleteEventFlag(EventFlag* obj) {
35 if (!obj) {
36 return;
37 }
38
39 status_t status = EventFlag::deleteEventFlag(&obj);
40 if (status) {
41 LOG(ERROR) << "write MQ event flag deletion error: " << strerror(-status);
42 }
43 }
44
45 class WriteThread : public Thread {
46 public:
47 // WriteThread's lifespan never exceeds StreamOut's lifespan.
48 WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
49 OutputStreamImpl::DataMQ* dataMQ,
50 OutputStreamImpl::StatusMQ* statusMQ, EventFlag* eventFlag);
51
52 ~WriteThread() override;
53
54 private:
55 bool threadLoop() override;
56
57 PresentationPosition doGetPresentationPosition();
58 int64_t doWrite();
59
60 std::atomic<bool>* const mStop;
61 AudioProxyStreamOut* mStream;
62 OutputStreamImpl::DataMQ* const mDataMQ;
63 OutputStreamImpl::StatusMQ* const mStatusMQ;
64 EventFlag* const mEventFlag;
65 const std::unique_ptr<int8_t[]> mBuffer;
66 };
67
WriteThread(std::atomic<bool> * stop,AudioProxyStreamOut * stream,OutputStreamImpl::DataMQ * dataMQ,OutputStreamImpl::StatusMQ * statusMQ,EventFlag * eventFlag)68 WriteThread::WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
69 OutputStreamImpl::DataMQ* dataMQ,
70 OutputStreamImpl::StatusMQ* statusMQ,
71 EventFlag* eventFlag)
72 : Thread(false /*canCallJava*/),
73 mStop(stop),
74 mStream(stream),
75 mDataMQ(dataMQ),
76 mStatusMQ(statusMQ),
77 mEventFlag(eventFlag),
78 mBuffer(new int8_t[mDataMQ->getQuantumCount()]) {}
79
80 WriteThread::~WriteThread() = default;
81
doWrite()82 int64_t WriteThread::doWrite() {
83 const size_t availToRead = mDataMQ->availableToRead();
84 if (availToRead == 0) {
85 return 0;
86 }
87
88 if (!mDataMQ->read(&mBuffer[0], availToRead)) {
89 return 0;
90 }
91
92 return mStream->write(&mBuffer[0], availToRead);
93 }
94
doGetPresentationPosition()95 PresentationPosition WriteThread::doGetPresentationPosition() {
96 PresentationPosition position;
97 mStream->getPresentationPosition(&position.frames, &position.timestamp);
98 return position;
99 }
100
threadLoop()101 bool WriteThread::threadLoop() {
102 // This implementation doesn't return control back to the Thread until the
103 // parent thread decides to stop, as the Thread uses mutexes, and this can
104 // lead to priority inversion.
105 while (!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
106 uint32_t efState = 0;
107 mEventFlag->wait(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY),
108 &efState);
109 if (!(efState & static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY))) {
110 continue; // Nothing to do.
111 }
112
113 WriteStatus status;
114 status.written = doWrite();
115 status.position = doGetPresentationPosition();
116
117 if (!mStatusMQ->write(&status)) {
118 LOG(ERROR) << "status message queue write failed.";
119 }
120 mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_FULL));
121 }
122
123 return false;
124 }
125
126 } // namespace
127
OutputStreamImpl(std::unique_ptr<AudioProxyStreamOut> stream)128 OutputStreamImpl::OutputStreamImpl(std::unique_ptr<AudioProxyStreamOut> stream)
129 : mStream(std::move(stream)), mEventFlag(nullptr, deleteEventFlag) {}
130
~OutputStreamImpl()131 OutputStreamImpl::~OutputStreamImpl() {
132 closeImpl();
133
134 if (mWriteThread) {
135 status_t status = mWriteThread->join();
136 if (status) {
137 LOG(ERROR) << "write thread exit error: " << strerror(-status);
138 }
139 }
140
141 mEventFlag.reset();
142 }
143
standby()144 ndk::ScopedAStatus OutputStreamImpl::standby() {
145 mStream->standby();
146 return ndk::ScopedAStatus::ok();
147 }
148
close()149 ndk::ScopedAStatus OutputStreamImpl::close() { return closeImpl(); }
150
closeImpl()151 ndk::ScopedAStatus OutputStreamImpl::closeImpl() {
152 if (mStopWriteThread.load(
153 std::memory_order_relaxed)) { // only this thread writes
154 return ndk::ScopedAStatus::ok();
155 }
156 mStopWriteThread.store(true, std::memory_order_release);
157 if (mEventFlag) {
158 mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY));
159 }
160
161 return ndk::ScopedAStatus::ok();
162 }
163
prepareForWriting(int32_t frameSize,int32_t framesCount,DataMQDesc * dataMQDesc,StatusMQDesc * statusMQDesc)164 ndk::ScopedAStatus OutputStreamImpl::prepareForWriting(
165 int32_t frameSize, int32_t framesCount, DataMQDesc* dataMQDesc,
166 StatusMQDesc* statusMQDesc) {
167 if (mDataMQ) {
168 LOG(ERROR) << "the client attempted to call prepareForWriting twice.";
169 return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
170 }
171
172 if (frameSize == 0 || framesCount == 0) {
173 LOG(ERROR) << "Invalid frameSize (" << frameSize << ") or framesCount ("
174 << framesCount << ")";
175 return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
176 }
177
178 if (frameSize > kMaxBufferSize / framesCount) {
179 LOG(ERROR) << "Buffer too big: " << frameSize << "*" << framesCount
180 << " bytes > MAX_BUFFER_SIZE (" << kMaxBufferSize << ")";
181 return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
182 }
183
184 auto dataMQ =
185 std::make_unique<DataMQ>(frameSize * framesCount, true /* EventFlag */);
186 if (!dataMQ->isValid()) {
187 LOG(ERROR) << "data MQ is invalid";
188 return ndk::ScopedAStatus::fromServiceSpecificError(
189 ERROR_FMQ_CREATION_FAILURE);
190 }
191
192 auto statusMQ = std::make_unique<StatusMQ>(1);
193 if (!statusMQ->isValid()) {
194 LOG(ERROR) << "status MQ is invalid";
195 return ndk::ScopedAStatus::fromServiceSpecificError(
196 ERROR_FMQ_CREATION_FAILURE);
197 }
198
199 EventFlag* rawEventFlag = nullptr;
200 status_t status =
201 EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag);
202 std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag,
203 deleteEventFlag);
204 if (status != ::android::OK || !eventFlag) {
205 LOG(ERROR) << "failed creating event flag for data MQ: "
206 << strerror(-status);
207 return ndk::ScopedAStatus::fromServiceSpecificError(
208 ERROR_FMQ_CREATION_FAILURE);
209 }
210
211 sp<WriteThread> writeThread =
212 new WriteThread(&mStopWriteThread, mStream.get(), dataMQ.get(),
213 statusMQ.get(), eventFlag.get());
214 status = writeThread->run("writer", ::android::PRIORITY_URGENT_AUDIO);
215 if (status != ::android::OK) {
216 LOG(ERROR) << "failed to start writer thread: " << strerror(-status);
217 return ndk::ScopedAStatus::fromServiceSpecificError(
218 ERROR_FMQ_CREATION_FAILURE);
219 }
220
221 mDataMQ = std::move(dataMQ);
222 mStatusMQ = std::move(statusMQ);
223 mEventFlag = std::move(eventFlag);
224 mWriteThread = std::move(writeThread);
225
226 *dataMQDesc = mDataMQ->dupeDesc();
227 *statusMQDesc = mStatusMQ->dupeDesc();
228
229 return ndk::ScopedAStatus::ok();
230 }
231
pause()232 ndk::ScopedAStatus OutputStreamImpl::pause() {
233 mStream->pause();
234 return ndk::ScopedAStatus::ok();
235 }
236
resume()237 ndk::ScopedAStatus OutputStreamImpl::resume() {
238 mStream->resume();
239 return ndk::ScopedAStatus::ok();
240 }
241
drain(AudioDrain type)242 ndk::ScopedAStatus OutputStreamImpl::drain(AudioDrain type) {
243 mStream->drain(type);
244 return ndk::ScopedAStatus::ok();
245 }
246
flush()247 ndk::ScopedAStatus OutputStreamImpl::flush() {
248 mStream->flush();
249 return ndk::ScopedAStatus::ok();
250 }
251
setVolume(float left,float right)252 ndk::ScopedAStatus OutputStreamImpl::setVolume(float left, float right) {
253 mStream->setVolume(left, right);
254 return ndk::ScopedAStatus::ok();
255 }
256
257 } // namespace audio_proxy