• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "Stream.h"
20 
21 #include <condition_variable>
22 #include <future>
23 #include <list>
24 #include <map>
25 #include <memory>
26 #include <mutex>
27 #include <string>
28 #include <unordered_set>
29 #include <vector>
30 
31 #include <utils/AndroidThreads.h>
32 
33 namespace android::soundpool {
34 
35 // TODO: Move helper classes to a utility file, with separate test.
36 
37 /**
38  * JavaThread is used like std::thread but for threads that may call the JVM.
39  *
40  * std::thread does not easily attach to the JVM.  We need JVM capable threads
41  * from createThreadEtc() since android binder call optimization may attempt to
42  * call back into Java if the SoundPool runs in system server.
43  *
44  *
45  * No locking is required - the member variables are inherently thread-safe.
46  */
47 class JavaThread {
48 public:
JavaThread(std::function<void ()> f,const char * name)49     JavaThread(std::function<void()> f, const char *name)
50         : mF{std::move(f)} {
51         createThreadEtc(staticFunction, this, name);
52     }
53 
54     JavaThread(JavaThread &&) = delete; // uses "this" ptr, not moveable.
55 
~JavaThread()56     ~JavaThread() {
57         join(); // manually block until the future is ready as std::future
58                 // destructor doesn't block unless it comes from std::async
59                 // and it is the last reference to shared state.
60     }
61 
join()62     void join() const {
63         mFuture.wait();
64     }
65 
isClosed()66     bool isClosed() const {
67         return mIsClosed;
68     }
69 
70 private:
staticFunction(void * data)71     static int staticFunction(void *data) {
72         JavaThread *jt = static_cast<JavaThread *>(data);
73         jt->mF();
74         jt->mIsClosed = true;  // set the flag that we are closed
75                                // now before we allow the destructor to execute;
76                                // otherwise there may be a use after free.
77         jt->mPromise.set_value();
78         return 0;
79     }
80 
81     // No locking is provided as these variables are initialized in the constructor
82     // and the members referenced are thread-safe objects.
83     // (mFuture.wait() can block multiple threads.)
84     // Note the order of member variables is reversed for destructor.
85     const std::function<void()> mF;
86     // Used in join() to block until the thread completes.
87     // See https://en.cppreference.com/w/cpp/thread/promise for the void specialization of
88     // promise.
89     std::promise<void>          mPromise;
90     std::future<void>           mFuture{mPromise.get_future()};
91     std::atomic_bool            mIsClosed = false;
92 };
93 
94 /**
95  * The ThreadPool manages thread lifetimes of SoundPool worker threads.
96  *
97  * TODO: the (eventual) goal of ThreadPool is to transparently and cooperatively
98  * maximize CPU utilization while avoiding starvation of other applications.
99  * Some possibilities:
100  *
101  * We should create worker threads when we have SoundPool work and the system is idle.
102  * CPU cycles are "use-it-or-lose-it" when the system is idle.
103  *
104  * We should adjust the priority of worker threads so that the second (and subsequent) worker
105  * threads have lower priority (should we try to promote priority also?).
106  *
107  * We should throttle the spawning of new worker threads, spacing over time, to avoid
108  * creating too many new threads all at once, on initialization.
109  */
110 class ThreadPool {
111 public:
ThreadPool(size_t maxThreadCount,std::string name)112     ThreadPool(size_t maxThreadCount, std::string name)
113         : mMaxThreadCount(maxThreadCount)
114         , mName{std::move(name)} { }
115 
~ThreadPool()116     ~ThreadPool() { quit(); }
117 
getActiveThreadCount()118     size_t getActiveThreadCount() const { return mActiveThreadCount; }
getMaxThreadCount()119     size_t getMaxThreadCount() const { return mMaxThreadCount; }
120 
quit()121     void quit() {
122         std::list<std::unique_ptr<JavaThread>> threads;
123         {
124             std::lock_guard lock(mThreadLock);
125             if (mQuit) return;  // already joined.
126             mQuit = true;
127             threads = std::move(mThreads);
128             mThreads.clear();
129         }
130         // mQuit set under lock, no more threads will be created.
131         for (auto &thread : threads) {
132             thread->join();
133             thread.reset();
134         }
135         LOG_ALWAYS_FATAL_IF(mActiveThreadCount != 0,
136                 "Invalid Active Threads: %zu", (size_t)mActiveThreadCount);
137     }
138 
139     // returns a non-zero id if successful, the id is to help logging messages.
launch(std::function<void (int32_t)> f)140     int32_t launch(std::function<void(int32_t /* id */)> f) {
141         std::list<std::unique_ptr<JavaThread>> threadsToRelease; // release outside of lock.
142         std::lock_guard lock(mThreadLock);
143         if (mQuit) return 0;  // ignore if we have quit
144 
145         // clean up threads.
146         for (auto it = mThreads.begin(); it != mThreads.end(); ) {
147             if ((*it)->isClosed()) {
148                 threadsToRelease.emplace_back(std::move(*it));
149                it = mThreads.erase(it);
150             } else {
151                ++it;
152             }
153         }
154 
155         const size_t threadCount = mThreads.size();
156         if (threadCount < mMaxThreadCount) {
157             // if the id wraps, we don't care about collisions.  it's just for logging.
158             mNextThreadId = mNextThreadId == INT32_MAX ? 1 : ++mNextThreadId;
159             const int32_t id = mNextThreadId;
160             mThreads.emplace_back(std::make_unique<JavaThread>(
161                     [this, id, mf = std::move(f)] { mf(id); --mActiveThreadCount; },
162                     (mName + std::to_string(id)).c_str()));
163             ++mActiveThreadCount;
164             return id;
165         }
166         return 0;
167     }
168 
169     // TODO: launch only if load average is low.
170     // This gets the load average
171     // See also std::thread::hardware_concurrency() for the concurrent capability.
getLoadAvg()172     static double getLoadAvg() {
173         double loadAvg[1];
174         if (getloadavg(loadAvg, std::size(loadAvg)) > 0) {
175             return loadAvg[0];
176         }
177         return -1.;
178     }
179 
180 private:
181     const size_t            mMaxThreadCount;
182     const std::string       mName;
183 
184     std::atomic_size_t      mActiveThreadCount = 0;
185 
186     std::mutex              mThreadLock;
187     bool                    mQuit GUARDED_BY(mThreadLock) = false;
188     int32_t                 mNextThreadId GUARDED_BY(mThreadLock) = 0;
189     std::list<std::unique_ptr<JavaThread>> mThreads GUARDED_BY(mThreadLock);
190 };
191 
192 /**
193  * A Perfect HashTable for IDs (key) to pointers (value).
194  *
195  * There are no collisions.  Why? because we generate the IDs for you to look up :-).
196  *
197  * The goal of this hash table is to map an integer ID handle > 0 to a pointer.
198  * We give these IDs in monotonic order (though we may skip if it were to cause a collision).
199  *
200  * The size of the hashtable must be large enough to accommodate the max number of keys.
201  * We suggest 2x.
202  *
203  * Readers are lockless
204  * Single writer could be lockless, but we allow multiple writers through an internal lock.
205  *
206  * For the Key type K, valid keys generated are > 0 (signed or unsigned)
207  * For the Value type V, values are pointers - nullptr means empty.
208  */
209 template <typename K, typename V>
210 class PerfectHash {
211 public:
PerfectHash(size_t hashCapacity)212     PerfectHash(size_t hashCapacity)
213         : mHashCapacity(hashCapacity)
214         , mK2V{new std::atomic<V>[hashCapacity]()} {
215     }
216 
217     // Generate a key for a value V.
218     // There is a testing function getKforV() which checks what the value reports as its key.
219     //
220     // Calls back into getKforV under lock.
221     //
222     // We expect that the hashCapacity is 2x the number of stored keys in order
223     // to have one or two tries to find an empty slot
224     K generateKey(V value, std::function<K(V)> getKforV, K oldKey = 0) {
225         std::lock_guard lock(mHashLock);
226         // try to remove the old key.
227         if (oldKey > 0) {  // key valid
228             const V v = getValue(oldKey);
229             if (v != nullptr) {  // value still valid
230                 const K atPosition = getKforV(v);
231                 if (atPosition < 0 ||            // invalid value
232                         atPosition == oldKey ||  // value's key still valid and matches old key
233                         ((atPosition ^ oldKey) & (mHashCapacity - 1)) != 0) { // stale key entry
234                     getValue(oldKey) = nullptr;  // invalidate
235                 }
236             } // else if value is invalid, no need to invalidate.
237         }
238         // check if we are invalidating only.
239         if (value == nullptr) return 0;
240         // now insert the new value and return the key.
241         size_t tries = 0;
242         for (; tries < mHashCapacity; ++tries) {
243             mNextKey = mNextKey == std::numeric_limits<K>::max() ? 1 : mNextKey + 1;
244             const V v = getValue(mNextKey);
245             //ALOGD("tries: %zu, key:%d value:%p", tries, (int)mNextKey, v);
246             if (v == nullptr) break; // empty
247             const K atPosition = getKforV(v);
248             //ALOGD("tries: %zu  key atPosition:%d", tries, (int)atPosition);
249             if (atPosition < 0 || // invalid value
250                     ((atPosition ^ mNextKey) & (mHashCapacity - 1)) != 0) { // stale key entry
251                 break;
252            }
253         }
254         LOG_ALWAYS_FATAL_IF(tries == mHashCapacity, "hash table overflow!");
255         //ALOGD("%s: found after %zu tries", __func__, tries);
256         getValue(mNextKey) = value;
257         return mNextKey;
258     }
259 
getValue(K key)260     std::atomic<V> &getValue(K key) { return mK2V[key & (mHashCapacity - 1)]; }
getValue(K key)261     const std::atomic_int32_t &getValue(K key) const { return mK2V[key & (mHashCapacity - 1)]; }
262 
263 private:
264     mutable std::mutex          mHashLock;
265     const size_t                mHashCapacity; // size of mK2V no lock needed.
266     std::unique_ptr<std::atomic<V>[]> mK2V;    // no lock needed for read access.
GUARDED_BY(mHashLock)267     K                           mNextKey GUARDED_BY(mHashLock) {};
268 };
269 
270 /**
271  * StreamMap contains the all the valid streams available to SoundPool.
272  *
273  * There is no Lock required for this class because the streams are
274  * allocated in the constructor, the lookup is lockless, and the Streams
275  * returned are locked internally.
276  *
277  * The lookup uses a perfect hash.
278  * It is possible to use a lockless hash table or to use a stripe-locked concurrent
279  * hashmap for essentially lock-free lookup.
280  *
281  * This follows Map-Reduce parallelism model.
282  * https://en.wikipedia.org/wiki/MapReduce
283  *
284  * Conceivably the forEach could be parallelized using std::for_each with a
285  * std::execution::par policy.
286  *
287  * https://en.cppreference.com/w/cpp/algorithm/for_each
288  */
289 class StreamMap {
290 public:
291     explicit StreamMap(int32_t streams);
292 
293     // Returns the stream associated with streamID or nullptr if not found.
294     // This need not be locked.
295     // The stream ID will never migrate to another Stream, but it may change
296     // underneath you.  The Stream operations that take a streamID will confirm
297     // that the streamID matches under the Stream lock before executing otherwise
298     // it ignores the command as stale.
299     Stream* findStream(int32_t streamID) const;
300 
301     // Iterates through the stream pool applying the function f.
302     // Since this enumerates over every single stream, it is unlocked.
303     //
304     // See related: https://en.cppreference.com/w/cpp/algorithm/for_each
forEach(std::function<void (const Stream *)> f)305     void forEach(std::function<void(const Stream *)>f) const {
306         for (size_t i = 0; i < mStreamPoolSize; ++i) {
307             f(&mStreamPool[i]);
308         }
309     }
310 
forEach(std::function<void (Stream *)> f)311     void forEach(std::function<void(Stream *)>f) {
312         for (size_t i = 0; i < mStreamPoolSize; ++i) {
313             f(&mStreamPool[i]);
314         }
315     }
316 
317     // Returns the pair stream for a given Stream.
318     // This need not be locked as it is a property of the pointer address.
getPairStream(const Stream * stream)319     Stream* getPairStream(const Stream* stream) const {
320         const size_t index = streamPosition(stream);
321         return &mStreamPool[index ^ 1];
322     }
323 
324     // find the position of the stream in mStreamPool array.
325     size_t streamPosition(const Stream* stream) const; // no lock needed
326 
getStreamMapSize()327     size_t getStreamMapSize() const {
328         return mStreamPoolSize;
329     }
330 
331     // find the next valid ID for a stream and store in hash table.
332     int32_t getNextIdForStream(Stream* stream) const;
333 
334 private:
335 
336     // use the hash table to attempt to find the stream.
337     // nullptr is returned if the lookup fails.
338     Stream* lookupStreamFromId(int32_t streamID) const;
339 
340     // The stream pool is initialized in the constructor, effectively const.
341     // no locking required for access.
342     //
343     // The constructor parameter "streams" results in streams pairs of streams.
344     // We have twice as many streams because we wish to return a streamID "handle"
345     // back to the app immediately, while we may be stopping the other stream in the
346     // pair to get its AudioTrack :-).
347     //
348     // Of the stream pair, only one of the streams may have an AudioTrack.
349     // The fixed association of a stream pair allows callbacks from the AudioTrack
350     // to be associated properly to either one or the other of the stream pair.
351     //
352     // TODO: The stream pair arrangement can be removed if we have better AudioTrack
353     // callback handling (being able to remove and change the callback after construction).
354     //
355     // Streams may be accessed anytime off of the stream pool
356     // as there is internal locking on each stream.
357     std::unique_ptr<Stream[]>   mStreamPool;        // no lock needed for access.
358     size_t                      mStreamPoolSize;    // no lock needed for access.
359 
360     // In order to find the Stream from a StreamID, we could do a linear lookup in mStreamPool.
361     // As an alternative, one could use stripe-locked or lock-free concurrent hashtables.
362     //
363     // When considering linear search vs hashmap, verify the typical use-case size.
364     // Linear search is faster than std::unordered_map (circa 2018) for less than 40 elements.
365     // [ Skarupke, M. (2018), "You Can Do Better than std::unordered_map: New and Recent
366     // Improvements to Hash Table Performance." C++Now 2018. cppnow.org, see
367     // https://www.youtube.com/watch?v=M2fKMP47slQ ]
368     //
369     // Here, we use a PerfectHash of Id to Stream *, since we can control the
370     // StreamID returned to the user.  This allows O(1) read access to mStreamPool lock-free.
371     //
372     // We prefer that the next stream ID is monotonic for aesthetic reasons
373     // (if we didn't care about monotonicity, a simple method is to apply a generation count
374     // to each stream in the unused upper bits of its index in mStreamPool for the id).
375     //
376     std::unique_ptr<PerfectHash<int32_t, Stream *>> mPerfectHash;
377 };
378 
379 /**
380  * StreamManager is used to manage the streams (accessed by StreamID from Java).
381  *
382  * Locking order (proceeds from application to component).
383  *  SoundPool mApiLock (if needed) -> StreamManager mStreamManagerLock
384  *                                 -> pair Stream mLock -> queued Stream mLock
385  */
386 class StreamManager : public StreamMap {
387 public:
388     // Note: the SoundPool pointer is only used for stream initialization.
389     // It is not stored in StreamManager.
390     StreamManager(int32_t streams, size_t threads, const audio_attributes_t& attributes,
391             std::string opPackageName);
392     ~StreamManager();
393 
394     // Returns positive streamID on success, 0 on failure.  This is locked.
395     int32_t queueForPlay(const std::shared_ptr<Sound> &sound,
396             int32_t soundID, float leftVolume, float rightVolume,
397             int32_t priority, int32_t loop, float rate)
398             NO_THREAD_SAFETY_ANALYSIS; // uses unique_lock
399 
400     ///////////////////////////////////////////////////////////////////////
401     // Called from soundpool::Stream
402 
getAttributes()403     const audio_attributes_t* getAttributes() const { return &mAttributes; }
404 
getOpPackageName()405     const std::string& getOpPackageName() const { return mOpPackageName; }
406 
407     // Moves the stream to the restart queue (called upon BUFFER_END of the static track)
408     // this is locked internally.
409     // If activeStreamIDToMatch is nonzero, it will only move to the restart queue
410     // if the streamIDToMatch is found on the active queue.
411     void moveToRestartQueue(Stream* stream, int32_t activeStreamIDToMatch = 0);
412 
413 private:
414 
415     void run(int32_t id) NO_THREAD_SAFETY_ANALYSIS; // worker thread, takes unique_lock.
416     void dump() const;                           // no lock needed
417 
418     // returns true if more worker threads are needed.
needMoreThreads_l()419     bool needMoreThreads_l() REQUIRES(mStreamManagerLock) {
420         return mRestartStreams.size() > 0 &&
421                 (mThreadPool->getActiveThreadCount() == 0
422                 || std::distance(mRestartStreams.begin(),
423                         mRestartStreams.upper_bound(systemTime()))
424                         > (ptrdiff_t)mThreadPool->getActiveThreadCount());
425     }
426 
427     // returns true if the stream was added.
428     bool moveToRestartQueue_l(
429             Stream* stream, int32_t activeStreamIDToMatch = 0) REQUIRES(mStreamManagerLock);
430     // returns number of queues the stream was removed from (should be 0 or 1);
431     // a special code of -1 is returned if activeStreamIDToMatch is > 0 and
432     // the stream wasn't found on the active queue.
433     ssize_t removeFromQueues_l(
434             Stream* stream, int32_t activeStreamIDToMatch = 0) REQUIRES(mStreamManagerLock);
435     void addToRestartQueue_l(Stream *stream) REQUIRES(mStreamManagerLock);
436     void addToActiveQueue_l(Stream *stream) REQUIRES(mStreamManagerLock);
437     void sanityCheckQueue_l() const REQUIRES(mStreamManagerLock);
438 
439     const audio_attributes_t mAttributes;
440     const std::string mOpPackageName;
441 
442    // For legacy compatibility, we lock the stream manager on stop when
443    // there is only one stream.  This allows a play to be called immediately
444    // after stopping, otherwise it is possible that the play might be discarded
445    // (returns 0) because that stream may be in the worker thread call to stop.
446     const bool mLockStreamManagerStop;
447 
448     std::unique_ptr<ThreadPool> mThreadPool;                  // locked internally
449 
450     // mStreamManagerLock is used to lock access for transitions between the
451     // 4 stream queues by the Manager Thread or by the user initiated play().
452     // A stream pair has exactly one stream on exactly one of the queues.
453     std::mutex                  mStreamManagerLock;
454     std::condition_variable     mStreamManagerCondition GUARDED_BY(mStreamManagerLock);
455 
456     bool                        mQuit GUARDED_BY(mStreamManagerLock) = false;
457 
458     // There are constructor arg "streams" pairs of streams, only one of each
459     // pair on the 4 stream queues below.  The other stream in the pair serves as
460     // placeholder to accumulate user changes, pending actual availability of the
461     // AudioTrack, as it may be in use, requiring stop-then-restart.
462     //
463     // The 4 queues are implemented in the appropriate STL container based on perceived
464     // optimality.
465 
466     // 1) mRestartStreams: Streams awaiting stop.
467     // The paired stream may be active (but with no AudioTrack), and will be restarted
468     // with an active AudioTrack when the current stream is stopped.
469     std::multimap<int64_t /* stopTimeNs */, Stream*>
470                                 mRestartStreams GUARDED_BY(mStreamManagerLock);
471 
472     // 2) mActiveStreams: Streams that are active.
473     // The paired stream will be inactive.
474     // This is in order of specified by kStealActiveStream_OldestFirst
475     std::list<Stream*>          mActiveStreams GUARDED_BY(mStreamManagerLock);
476 
477     // 3) mAvailableStreams: Streams that are inactive.
478     // The paired stream will also be inactive.
479     // No particular order.
480     std::unordered_set<Stream*> mAvailableStreams GUARDED_BY(mStreamManagerLock);
481 
482     // 4) mProcessingStreams: Streams that are being processed by the ManagerThreads
483     // When on this queue, the stream and its pair are not available for stealing.
484     // Each ManagerThread will have at most one stream on the mProcessingStreams queue.
485     // The paired stream may be active or restarting.
486     // No particular order.
487     std::unordered_set<Stream*> mProcessingStreams GUARDED_BY(mStreamManagerLock);
488 };
489 
490 } // namespace android::soundpool
491