• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "AHAL_StreamRemoteSubmix"
18 #include <android-base/logging.h>
19 
20 #include <cmath>
21 
22 #include "core-impl/StreamRemoteSubmix.h"
23 
24 using aidl::android::hardware::audio::common::SinkMetadata;
25 using aidl::android::hardware::audio::common::SourceMetadata;
26 using aidl::android::hardware::audio::core::r_submix::SubmixRoute;
27 using aidl::android::media::audio::common::AudioDeviceAddress;
28 using aidl::android::media::audio::common::AudioOffloadInfo;
29 using aidl::android::media::audio::common::MicrophoneDynamicInfo;
30 using aidl::android::media::audio::common::MicrophoneInfo;
31 
32 namespace aidl::android::hardware::audio::core {
33 
StreamRemoteSubmix(StreamContext * context,const Metadata & metadata,const AudioDeviceAddress & deviceAddress)34 StreamRemoteSubmix::StreamRemoteSubmix(StreamContext* context, const Metadata& metadata,
35                                        const AudioDeviceAddress& deviceAddress)
36     : StreamCommonImpl(context, metadata),
37       mDeviceAddress(deviceAddress),
38       mIsInput(isInput(metadata)) {
39     mStreamConfig.frameSize = context->getFrameSize();
40     mStreamConfig.format = context->getFormat();
41     mStreamConfig.channelLayout = context->getChannelLayout();
42     mStreamConfig.sampleRate = context->getSampleRate();
43 }
44 
45 std::mutex StreamRemoteSubmix::sSubmixRoutesLock;
46 std::map<AudioDeviceAddress, std::shared_ptr<SubmixRoute>> StreamRemoteSubmix::sSubmixRoutes;
47 
init()48 ::android::status_t StreamRemoteSubmix::init() {
49     {
50         std::lock_guard guard(sSubmixRoutesLock);
51         auto routeItr = sSubmixRoutes.find(mDeviceAddress);
52         if (routeItr != sSubmixRoutes.end()) {
53             mCurrentRoute = routeItr->second;
54         }
55     }
56     // If route is not available for this port, add it.
57     if (mCurrentRoute == nullptr) {
58         // Initialize the pipe.
59         mCurrentRoute = std::make_shared<SubmixRoute>();
60         if (::android::OK != mCurrentRoute->createPipe(mStreamConfig)) {
61             LOG(ERROR) << __func__ << ": create pipe failed";
62             return ::android::NO_INIT;
63         }
64         {
65             std::lock_guard guard(sSubmixRoutesLock);
66             sSubmixRoutes.emplace(mDeviceAddress, mCurrentRoute);
67         }
68     } else {
69         if (!mCurrentRoute->isStreamConfigValid(mIsInput, mStreamConfig)) {
70             LOG(ERROR) << __func__ << ": invalid stream config";
71             return ::android::NO_INIT;
72         }
73         sp<MonoPipe> sink = mCurrentRoute->getSink();
74         if (sink == nullptr) {
75             LOG(ERROR) << __func__ << ": nullptr sink when opening stream";
76             return ::android::NO_INIT;
77         }
78         // If the sink has been shutdown or pipe recreation is forced, delete the pipe and
79         // recreate it.
80         if (sink->isShutdown()) {
81             LOG(DEBUG) << __func__ << ": Non-nullptr shut down sink when opening stream";
82             if (::android::OK != mCurrentRoute->resetPipe()) {
83                 LOG(ERROR) << __func__ << ": reset pipe failed";
84                 return ::android::NO_INIT;
85             }
86         }
87     }
88 
89     mCurrentRoute->openStream(mIsInput);
90     return ::android::OK;
91 }
92 
drain(StreamDescriptor::DrainMode)93 ::android::status_t StreamRemoteSubmix::drain(StreamDescriptor::DrainMode) {
94     usleep(1000);
95     return ::android::OK;
96 }
97 
flush()98 ::android::status_t StreamRemoteSubmix::flush() {
99     usleep(1000);
100     return ::android::OK;
101 }
102 
pause()103 ::android::status_t StreamRemoteSubmix::pause() {
104     usleep(1000);
105     return ::android::OK;
106 }
107 
standby()108 ::android::status_t StreamRemoteSubmix::standby() {
109     mCurrentRoute->standby(mIsInput);
110     return ::android::OK;
111 }
112 
start()113 ::android::status_t StreamRemoteSubmix::start() {
114     mCurrentRoute->exitStandby(mIsInput);
115     return ::android::OK;
116 }
117 
prepareToClose()118 ndk::ScopedAStatus StreamRemoteSubmix::prepareToClose() {
119     if (!mIsInput) {
120         std::shared_ptr<SubmixRoute> route = nullptr;
121         {
122             std::lock_guard guard(sSubmixRoutesLock);
123             auto routeItr = sSubmixRoutes.find(mDeviceAddress);
124             if (routeItr != sSubmixRoutes.end()) {
125                 route = routeItr->second;
126             }
127         }
128         if (route != nullptr) {
129             sp<MonoPipe> sink = route->getSink();
130             if (sink == nullptr) {
131                 ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
132             }
133             LOG(DEBUG) << __func__ << ": shutting down MonoPipe sink";
134 
135             sink->shutdown(true);
136         } else {
137             LOG(DEBUG) << __func__ << ": stream already closed.";
138             ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
139         }
140     }
141     return ndk::ScopedAStatus::ok();
142 }
143 
144 // Remove references to the specified input and output streams.  When the device no longer
145 // references input and output streams destroy the associated pipe.
shutdown()146 void StreamRemoteSubmix::shutdown() {
147     mCurrentRoute->closeStream(mIsInput);
148     // If all stream instances are closed, we can remove route information for this port.
149     if (!mCurrentRoute->hasAtleastOneStreamOpen()) {
150         mCurrentRoute->releasePipe();
151         LOG(DEBUG) << __func__ << ": pipe destroyed";
152 
153         std::lock_guard guard(sSubmixRoutesLock);
154         sSubmixRoutes.erase(mDeviceAddress);
155     }
156     mCurrentRoute.reset();
157 }
158 
transfer(void * buffer,size_t frameCount,size_t * actualFrameCount,int32_t * latencyMs)159 ::android::status_t StreamRemoteSubmix::transfer(void* buffer, size_t frameCount,
160                                                  size_t* actualFrameCount, int32_t* latencyMs) {
161     *latencyMs = (getStreamPipeSizeInFrames() * MILLIS_PER_SECOND) / mStreamConfig.sampleRate;
162     LOG(VERBOSE) << __func__ << ": Latency " << *latencyMs << "ms";
163 
164     sp<MonoPipe> sink = mCurrentRoute->getSink();
165     if (sink != nullptr) {
166         if (sink->isShutdown()) {
167             sink.clear();
168             LOG(VERBOSE) << __func__ << ": pipe shutdown, ignoring the transfer.";
169             // the pipe has already been shutdown, this buffer will be lost but we must simulate
170             // timing so we don't drain the output faster than realtime
171             const size_t delayUs = static_cast<size_t>(
172                     std::roundf(frameCount * MICROS_PER_SECOND / mStreamConfig.sampleRate));
173             usleep(delayUs);
174 
175             *actualFrameCount = frameCount;
176             return ::android::OK;
177         }
178     } else {
179         LOG(ERROR) << __func__ << ": transfer without a pipe!";
180         return ::android::UNEXPECTED_NULL;
181     }
182     mCurrentRoute->exitStandby(mIsInput);
183     return (mIsInput ? inRead(buffer, frameCount, actualFrameCount)
184                      : outWrite(buffer, frameCount, actualFrameCount));
185 }
186 
refinePosition(StreamDescriptor::Position * position)187 ::android::status_t StreamRemoteSubmix::refinePosition(StreamDescriptor::Position* position) {
188     sp<MonoPipeReader> source = mCurrentRoute->getSource();
189     if (source == nullptr) {
190         return ::android::NO_INIT;
191     }
192     const ssize_t framesInPipe = source->availableToRead();
193     if (framesInPipe <= 0) {
194         // No need to update the position frames
195         return ::android::OK;
196     }
197     if (mIsInput) {
198         position->frames += framesInPipe;
199     } else if (position->frames >= framesInPipe) {
200         position->frames -= framesInPipe;
201     }
202     return ::android::OK;
203 }
204 
205 // Calculate the maximum size of the pipe buffer in frames for the specified stream.
getStreamPipeSizeInFrames()206 size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
207     auto pipeConfig = mCurrentRoute->mPipeConfig;
208     const size_t maxFrameSize = std::max(mStreamConfig.frameSize, pipeConfig.frameSize);
209     return (pipeConfig.frameCount * pipeConfig.frameSize) / maxFrameSize;
210 }
211 
outWrite(void * buffer,size_t frameCount,size_t * actualFrameCount)212 ::android::status_t StreamRemoteSubmix::outWrite(void* buffer, size_t frameCount,
213                                                  size_t* actualFrameCount) {
214     sp<MonoPipe> sink = mCurrentRoute->getSink();
215     if (sink != nullptr) {
216         if (sink->isShutdown()) {
217             sink.clear();
218             LOG(VERBOSE) << __func__ << ": pipe shutdown, ignoring the write.";
219             // the pipe has already been shutdown, this buffer will be lost but we must
220             // simulate timing so we don't drain the output faster than realtime
221             const size_t delayUs = static_cast<size_t>(
222                     std::roundf(frameCount * MICROS_PER_SECOND / mStreamConfig.sampleRate));
223             usleep(delayUs);
224             *actualFrameCount = frameCount;
225             return ::android::OK;
226         }
227     } else {
228         LOG(FATAL) << __func__ << ": without a pipe!";
229         return ::android::UNKNOWN_ERROR;
230     }
231 
232     const size_t availableToWrite = sink->availableToWrite();
233     // NOTE: sink has been checked above and sink and source life cycles are synchronized
234     sp<MonoPipeReader> source = mCurrentRoute->getSource();
235     // If the write to the sink should be blocked, flush enough frames from the pipe to make space
236     // to write the most recent data.
237     if (!mCurrentRoute->shouldBlockWrite() && availableToWrite < frameCount) {
238         static uint8_t flushBuffer[64];
239         const size_t flushBufferSizeFrames = sizeof(flushBuffer) / mStreamConfig.frameSize;
240         size_t framesToFlushFromSource = frameCount - availableToWrite;
241         LOG(VERBOSE) << __func__ << ": flushing " << framesToFlushFromSource
242                      << " frames from the pipe to avoid blocking";
243         while (framesToFlushFromSource) {
244             const size_t flushSize = std::min(framesToFlushFromSource, flushBufferSizeFrames);
245             framesToFlushFromSource -= flushSize;
246             // read does not block
247             source->read(flushBuffer, flushSize);
248         }
249     }
250 
251     ssize_t writtenFrames = sink->write(buffer, frameCount);
252     if (writtenFrames < 0) {
253         if (writtenFrames == (ssize_t)::android::NEGOTIATE) {
254             LOG(ERROR) << __func__ << ": write to pipe returned NEGOTIATE";
255             sink.clear();
256             *actualFrameCount = 0;
257             return ::android::UNKNOWN_ERROR;
258         } else {
259             // write() returned UNDERRUN or WOULD_BLOCK, retry
260             LOG(ERROR) << __func__ << ": write to pipe returned unexpected " << writtenFrames;
261             writtenFrames = sink->write(buffer, frameCount);
262         }
263     }
264     sink.clear();
265 
266     if (writtenFrames < 0) {
267         LOG(ERROR) << __func__ << ": failed writing to pipe with " << writtenFrames;
268         *actualFrameCount = 0;
269         return ::android::UNKNOWN_ERROR;
270     }
271     LOG(VERBOSE) << __func__ << ": wrote " << writtenFrames << "frames";
272     *actualFrameCount = writtenFrames;
273     return ::android::OK;
274 }
275 
inRead(void * buffer,size_t frameCount,size_t * actualFrameCount)276 ::android::status_t StreamRemoteSubmix::inRead(void* buffer, size_t frameCount,
277                                                size_t* actualFrameCount) {
278     // about to read from audio source
279     sp<MonoPipeReader> source = mCurrentRoute->getSource();
280     if (source == nullptr) {
281         int readErrorCount = mCurrentRoute->notifyReadError();
282         if (readErrorCount < kMaxReadErrorLogs) {
283             LOG(ERROR) << __func__
284                        << ": no audio pipe yet we're trying to read! (not all errors will be "
285                           "logged)";
286         } else {
287             LOG(ERROR) << __func__ << ": Read errors " << readErrorCount;
288         }
289         const size_t delayUs = static_cast<size_t>(
290                 std::roundf(frameCount * MICROS_PER_SECOND / mStreamConfig.sampleRate));
291         usleep(delayUs);
292         memset(buffer, 0, mStreamConfig.frameSize * frameCount);
293         *actualFrameCount = frameCount;
294         return ::android::OK;
295     }
296 
297     // read the data from the pipe
298     int attempts = 0;
299     const size_t delayUs = static_cast<size_t>(std::roundf(kReadAttemptSleepUs));
300     char* buff = (char*)buffer;
301     size_t remainingFrames = frameCount;
302     int availableToRead = source->availableToRead();
303 
304     while ((remainingFrames > 0) && (availableToRead > 0) && (attempts < kMaxReadFailureAttempts)) {
305         LOG(VERBOSE) << __func__ << ": frames available to read " << availableToRead;
306 
307         ssize_t framesRead = source->read(buff, remainingFrames);
308 
309         LOG(VERBOSE) << __func__ << ": frames read " << framesRead;
310 
311         if (framesRead > 0) {
312             remainingFrames -= framesRead;
313             buff += framesRead * mStreamConfig.frameSize;
314             availableToRead -= framesRead;
315             LOG(VERBOSE) << __func__ << ": (attempts = " << attempts << ") got " << framesRead
316                          << " frames, remaining=" << remainingFrames;
317         } else {
318             attempts++;
319             LOG(WARNING) << __func__ << ": read returned " << framesRead
320                          << " , read failure attempts = " << attempts;
321             usleep(delayUs);
322         }
323     }
324     // done using the source
325     source.clear();
326 
327     if (remainingFrames > 0) {
328         const size_t remainingBytes = remainingFrames * mStreamConfig.frameSize;
329         LOG(VERBOSE) << __func__ << ": clearing remaining_frames = " << remainingFrames;
330         memset(((char*)buffer) + (mStreamConfig.frameSize * frameCount) - remainingBytes, 0,
331                remainingBytes);
332     }
333 
334     long readCounterFrames = mCurrentRoute->updateReadCounterFrames(frameCount);
335     *actualFrameCount = frameCount;
336 
337     // compute how much we need to sleep after reading the data by comparing the wall clock with
338     //   the projected time at which we should return.
339     // wall clock after reading from the pipe
340     auto recordDurationUs = std::chrono::steady_clock::now() - mCurrentRoute->getRecordStartTime();
341 
342     // readCounterFrames contains the number of frames that have been read since the beginning of
343     // recording (including this call): it's converted to usec and compared to how long we've been
344     // recording for, which gives us how long we must wait to sync the projected recording time, and
345     // the observed recording time.
346     const int projectedVsObservedOffsetUs =
347             std::roundf((readCounterFrames * MICROS_PER_SECOND / mStreamConfig.sampleRate) -
348                         recordDurationUs.count());
349 
350     LOG(VERBOSE) << __func__ << ": record duration " << recordDurationUs.count()
351                  << " microseconds, will wait: " << projectedVsObservedOffsetUs << " microseconds";
352     if (projectedVsObservedOffsetUs > 0) {
353         usleep(projectedVsObservedOffsetUs);
354     }
355     return ::android::OK;
356 }
357 
StreamInRemoteSubmix(StreamContext && context,const SinkMetadata & sinkMetadata,const std::vector<MicrophoneInfo> & microphones)358 StreamInRemoteSubmix::StreamInRemoteSubmix(StreamContext&& context,
359                                            const SinkMetadata& sinkMetadata,
360                                            const std::vector<MicrophoneInfo>& microphones)
361     : StreamIn(std::move(context), microphones), StreamSwitcher(&mContextInstance, sinkMetadata) {}
362 
getActiveMicrophones(std::vector<MicrophoneDynamicInfo> * _aidl_return)363 ndk::ScopedAStatus StreamInRemoteSubmix::getActiveMicrophones(
364         std::vector<MicrophoneDynamicInfo>* _aidl_return) {
365     LOG(DEBUG) << __func__ << ": not supported";
366     *_aidl_return = std::vector<MicrophoneDynamicInfo>();
367     return ndk::ScopedAStatus::ok();
368 }
369 
switchCurrentStream(const std::vector<::aidl::android::media::audio::common::AudioDevice> & devices)370 StreamSwitcher::DeviceSwitchBehavior StreamInRemoteSubmix::switchCurrentStream(
371         const std::vector<::aidl::android::media::audio::common::AudioDevice>& devices) {
372     // This implementation effectively postpones stream creation until
373     // receiving the first call to 'setConnectedDevices' with a non-empty list.
374     if (isStubStream()) {
375         if (devices.size() == 1) {
376             auto deviceDesc = devices.front().type;
377             if (deviceDesc.type ==
378                 ::aidl::android::media::audio::common::AudioDeviceType::IN_SUBMIX) {
379                 return DeviceSwitchBehavior::CREATE_NEW_STREAM;
380             }
381             LOG(ERROR) << __func__ << ": Device type " << toString(deviceDesc.type)
382                        << " not supported";
383         } else {
384             LOG(ERROR) << __func__ << ": Only single device supported.";
385         }
386         return DeviceSwitchBehavior::UNSUPPORTED_DEVICES;
387     }
388     return DeviceSwitchBehavior::USE_CURRENT_STREAM;
389 }
390 
createNewStream(const std::vector<::aidl::android::media::audio::common::AudioDevice> & devices,StreamContext * context,const Metadata & metadata)391 std::unique_ptr<StreamCommonInterfaceEx> StreamInRemoteSubmix::createNewStream(
392         const std::vector<::aidl::android::media::audio::common::AudioDevice>& devices,
393         StreamContext* context, const Metadata& metadata) {
394     return std::unique_ptr<StreamCommonInterfaceEx>(
395             new InnerStreamWrapper<StreamRemoteSubmix>(context, metadata, devices.front().address));
396 }
397 
StreamOutRemoteSubmix(StreamContext && context,const SourceMetadata & sourceMetadata,const std::optional<AudioOffloadInfo> & offloadInfo)398 StreamOutRemoteSubmix::StreamOutRemoteSubmix(StreamContext&& context,
399                                              const SourceMetadata& sourceMetadata,
400                                              const std::optional<AudioOffloadInfo>& offloadInfo)
401     : StreamOut(std::move(context), offloadInfo),
402       StreamSwitcher(&mContextInstance, sourceMetadata) {}
403 
switchCurrentStream(const std::vector<::aidl::android::media::audio::common::AudioDevice> & devices)404 StreamSwitcher::DeviceSwitchBehavior StreamOutRemoteSubmix::switchCurrentStream(
405         const std::vector<::aidl::android::media::audio::common::AudioDevice>& devices) {
406     // This implementation effectively postpones stream creation until
407     // receiving the first call to 'setConnectedDevices' with a non-empty list.
408     if (isStubStream()) {
409         if (devices.size() == 1) {
410             auto deviceDesc = devices.front().type;
411             if (deviceDesc.type ==
412                 ::aidl::android::media::audio::common::AudioDeviceType::OUT_SUBMIX) {
413                 return DeviceSwitchBehavior::CREATE_NEW_STREAM;
414             }
415             LOG(ERROR) << __func__ << ": Device type " << toString(deviceDesc.type)
416                        << " not supported";
417         } else {
418             LOG(ERROR) << __func__ << ": Only single device supported.";
419         }
420         return DeviceSwitchBehavior::UNSUPPORTED_DEVICES;
421     }
422     return DeviceSwitchBehavior::USE_CURRENT_STREAM;
423 }
424 
createNewStream(const std::vector<::aidl::android::media::audio::common::AudioDevice> & devices,StreamContext * context,const Metadata & metadata)425 std::unique_ptr<StreamCommonInterfaceEx> StreamOutRemoteSubmix::createNewStream(
426         const std::vector<::aidl::android::media::audio::common::AudioDevice>& devices,
427         StreamContext* context, const Metadata& metadata) {
428     return std::unique_ptr<StreamCommonInterfaceEx>(
429             new InnerStreamWrapper<StreamRemoteSubmix>(context, metadata, devices.front().address));
430 }
431 
432 }  // namespace aidl::android::hardware::audio::core
433