• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "Stream.h"
20 
21 #include <condition_variable>
22 #include <future>
23 #include <list>
24 #include <map>
25 #include <memory>
26 #include <mutex>
27 #include <string>
28 #include <unordered_set>
29 #include <vector>
30 
31 #include <utils/AndroidThreads.h>
32 
33 namespace android::soundpool {
34 
35 // TODO: Move helper classes to a utility file, with separate test.
36 
37 /**
38  * JavaThread is used like std::thread but for threads that may call the JVM.
39  *
40  * std::thread does not easily attach to the JVM.  We need JVM capable threads
41  * from createThreadEtc() since android binder call optimization may attempt to
42  * call back into Java if the SoundPool runs in system server.
43  *
44  *
45  * No locking is required - the member variables are inherently thread-safe.
46  */
47 class JavaThread {
48 public:
JavaThread(std::function<void ()> f,const char * name,int32_t threadPriority)49     JavaThread(std::function<void()> f, const char *name, int32_t threadPriority)
50         : mF{std::move(f)} {
51         createThreadEtc(staticFunction, this, name, threadPriority);
52     }
53 
54     JavaThread(JavaThread &&) = delete; // uses "this" ptr, not moveable.
55 
~JavaThread()56     ~JavaThread() {
57         join(); // manually block until the future is ready as std::future
58                 // destructor doesn't block unless it comes from std::async
59                 // and it is the last reference to shared state.
60     }
61 
join()62     void join() const {
63         mFuture.wait();
64     }
65 
isClosed()66     bool isClosed() const {
67         return mIsClosed;
68     }
69 
70 private:
staticFunction(void * data)71     static int staticFunction(void *data) {
72         JavaThread *jt = static_cast<JavaThread *>(data);
73         jt->mF();
74         jt->mIsClosed = true;  // set the flag that we are closed
75                                // now before we allow the destructor to execute;
76                                // otherwise there may be a use after free.
77         jt->mPromise.set_value();
78         return 0;
79     }
80 
81     // No locking is provided as these variables are initialized in the constructor
82     // and the members referenced are thread-safe objects.
83     // (mFuture.wait() can block multiple threads.)
84     // Note the order of member variables is reversed for destructor.
85     const std::function<void()> mF;
86     // Used in join() to block until the thread completes.
87     // See https://en.cppreference.com/w/cpp/thread/promise for the void specialization of
88     // promise.
89     std::promise<void>          mPromise;
90     std::future<void>           mFuture{mPromise.get_future()};
91     std::atomic_bool            mIsClosed = false;
92 };
93 
94 /**
95  * The ThreadPool manages thread lifetimes of SoundPool worker threads.
96  *
97  * TODO: the (eventual) goal of ThreadPool is to transparently and cooperatively
98  * maximize CPU utilization while avoiding starvation of other applications.
99  * Some possibilities:
100  *
101  * We should create worker threads when we have SoundPool work and the system is idle.
102  * CPU cycles are "use-it-or-lose-it" when the system is idle.
103  *
104  * We should adjust the priority of worker threads so that the second (and subsequent) worker
105  * threads have lower priority (should we try to promote priority also?).
106  *
107  * We should throttle the spawning of new worker threads, spacing over time, to avoid
108  * creating too many new threads all at once, on initialization.
109  */
110 class ThreadPool {
111 public:
112     ThreadPool(size_t maxThreadCount, std::string name,
113             int32_t threadPriority = ANDROID_PRIORITY_NORMAL)
mMaxThreadCount(maxThreadCount)114         : mMaxThreadCount(maxThreadCount)
115         , mName{std::move(name)}
116         , mThreadPriority(threadPriority) {}
117 
~ThreadPool()118     ~ThreadPool() { quit(); }
119 
getActiveThreadCount()120     size_t getActiveThreadCount() const { return mActiveThreadCount; }
getMaxThreadCount()121     size_t getMaxThreadCount() const { return mMaxThreadCount; }
122 
quit()123     void quit() {
124         std::list<std::unique_ptr<JavaThread>> threads;
125         {
126             std::lock_guard lock(mThreadLock);
127             if (mQuit) return;  // already joined.
128             mQuit = true;
129             threads = std::move(mThreads);
130             mThreads.clear();
131         }
132         // mQuit set under lock, no more threads will be created.
133         for (auto &thread : threads) {
134             thread->join();
135             thread.reset();
136         }
137         LOG_ALWAYS_FATAL_IF(mActiveThreadCount != 0,
138                 "Invalid Active Threads: %zu", (size_t)mActiveThreadCount);
139     }
140 
141     // returns a non-zero id if successful, the id is to help logging messages.
launch(std::function<void (int32_t)> f)142     int32_t launch(std::function<void(int32_t /* id */)> f) {
143         std::list<std::unique_ptr<JavaThread>> threadsToRelease; // release outside of lock.
144         std::lock_guard lock(mThreadLock);
145         if (mQuit) return 0;  // ignore if we have quit
146 
147         // clean up threads.
148         for (auto it = mThreads.begin(); it != mThreads.end(); ) {
149             if ((*it)->isClosed()) {
150                 threadsToRelease.emplace_back(std::move(*it));
151                it = mThreads.erase(it);
152             } else {
153                ++it;
154             }
155         }
156 
157         const size_t threadCount = mThreads.size();
158         if (threadCount < mMaxThreadCount) {
159             // if the id wraps, we don't care about collisions.  it's just for logging.
160             mNextThreadId = mNextThreadId == INT32_MAX ? 1 : ++mNextThreadId;
161             const int32_t id = mNextThreadId;
162             mThreads.emplace_back(std::make_unique<JavaThread>(
163                     [this, id, mf = std::move(f)] { mf(id); --mActiveThreadCount; },
164                     (mName + std::to_string(id)).c_str(),
165                     mThreadPriority));
166             ++mActiveThreadCount;
167             return id;
168         }
169         return 0;
170     }
171 
172     // TODO: launch only if load average is low.
173     // This gets the load average
174     // See also std::thread::hardware_concurrency() for the concurrent capability.
getLoadAvg()175     static double getLoadAvg() {
176         double loadAvg[1];
177         if (getloadavg(loadAvg, std::size(loadAvg)) > 0) {
178             return loadAvg[0];
179         }
180         return -1.;
181     }
182 
183 private:
184     const size_t            mMaxThreadCount;
185     const std::string       mName;
186     const int32_t           mThreadPriority;
187 
188     std::atomic_size_t      mActiveThreadCount = 0;
189 
190     std::mutex              mThreadLock;
191     bool                    mQuit GUARDED_BY(mThreadLock) = false;
192     int32_t                 mNextThreadId GUARDED_BY(mThreadLock) = 0;
193     std::list<std::unique_ptr<JavaThread>> mThreads GUARDED_BY(mThreadLock);
194 };
195 
196 /**
197  * A Perfect HashTable for IDs (key) to pointers (value).
198  *
199  * There are no collisions.  Why? because we generate the IDs for you to look up :-).
200  *
201  * The goal of this hash table is to map an integer ID handle > 0 to a pointer.
202  * We give these IDs in monotonic order (though we may skip if it were to cause a collision).
203  *
204  * The size of the hashtable must be large enough to accommodate the max number of keys.
205  * We suggest 2x.
206  *
207  * Readers are lockless
208  * Single writer could be lockless, but we allow multiple writers through an internal lock.
209  *
210  * For the Key type K, valid keys generated are > 0 (signed or unsigned)
211  * For the Value type V, values are pointers - nullptr means empty.
212  */
213 template <typename K, typename V>
214 class PerfectHash {
215 public:
PerfectHash(size_t hashCapacity)216     PerfectHash(size_t hashCapacity)
217         : mHashCapacity(hashCapacity)
218         , mK2V{new std::atomic<V>[hashCapacity]()} {
219     }
220 
221     // Generate a key for a value V.
222     // There is a testing function getKforV() which checks what the value reports as its key.
223     //
224     // Calls back into getKforV under lock.
225     //
226     // We expect that the hashCapacity is 2x the number of stored keys in order
227     // to have one or two tries to find an empty slot
228     K generateKey(V value, std::function<K(V)> getKforV, K oldKey = 0) {
229         std::lock_guard lock(mHashLock);
230         // try to remove the old key.
231         if (oldKey > 0) {  // key valid
232             const V v = getValue(oldKey);
233             if (v != nullptr) {  // value still valid
234                 const K atPosition = getKforV(v);
235                 if (atPosition < 0 ||            // invalid value
236                         atPosition == oldKey ||  // value's key still valid and matches old key
237                         ((atPosition ^ oldKey) & (mHashCapacity - 1)) != 0) { // stale key entry
238                     getValue(oldKey) = nullptr;  // invalidate
239                 }
240             } // else if value is invalid, no need to invalidate.
241         }
242         // check if we are invalidating only.
243         if (value == nullptr) return 0;
244         // now insert the new value and return the key.
245         size_t tries = 0;
246         for (; tries < mHashCapacity; ++tries) {
247             mNextKey = mNextKey == std::numeric_limits<K>::max() ? 1 : mNextKey + 1;
248             const V v = getValue(mNextKey);
249             //ALOGD("tries: %zu, key:%d value:%p", tries, (int)mNextKey, v);
250             if (v == nullptr) break; // empty
251             const K atPosition = getKforV(v);
252             //ALOGD("tries: %zu  key atPosition:%d", tries, (int)atPosition);
253             if (atPosition < 0 || // invalid value
254                     ((atPosition ^ mNextKey) & (mHashCapacity - 1)) != 0) { // stale key entry
255                 break;
256            }
257         }
258         LOG_ALWAYS_FATAL_IF(tries == mHashCapacity, "hash table overflow!");
259         //ALOGD("%s: found after %zu tries", __func__, tries);
260         getValue(mNextKey) = value;
261         return mNextKey;
262     }
263 
getValue(K key)264     std::atomic<V> &getValue(K key) { return mK2V[key & (mHashCapacity - 1)]; }
getValue(K key)265     const std::atomic_int32_t &getValue(K key) const { return mK2V[key & (mHashCapacity - 1)]; }
266 
267 private:
268     mutable std::mutex          mHashLock;
269     const size_t                mHashCapacity; // size of mK2V no lock needed.
270     std::unique_ptr<std::atomic<V>[]> mK2V;    // no lock needed for read access.
GUARDED_BY(mHashLock)271     K                           mNextKey GUARDED_BY(mHashLock) {};
272 };
273 
274 /**
275  * StreamMap contains the all the valid streams available to SoundPool.
276  *
277  * There is no Lock required for this class because the streams are
278  * allocated in the constructor, the lookup is lockless, and the Streams
279  * returned are locked internally.
280  *
281  * The lookup uses a perfect hash.
282  * It is possible to use a lockless hash table or to use a stripe-locked concurrent
283  * hashmap for essentially lock-free lookup.
284  *
285  * This follows Map-Reduce parallelism model.
286  * https://en.wikipedia.org/wiki/MapReduce
287  *
288  * Conceivably the forEach could be parallelized using std::for_each with a
289  * std::execution::par policy.
290  *
291  * https://en.cppreference.com/w/cpp/algorithm/for_each
292  */
293 class StreamMap {
294 public:
295     explicit StreamMap(int32_t streams);
296 
297     // Returns the stream associated with streamID or nullptr if not found.
298     // This need not be locked.
299     // The stream ID will never migrate to another Stream, but it may change
300     // underneath you.  The Stream operations that take a streamID will confirm
301     // that the streamID matches under the Stream lock before executing otherwise
302     // it ignores the command as stale.
303     Stream* findStream(int32_t streamID) const;
304 
305     // Iterates through the stream pool applying the function f.
306     // Since this enumerates over every single stream, it is unlocked.
307     //
308     // See related: https://en.cppreference.com/w/cpp/algorithm/for_each
forEach(std::function<void (const Stream *)> f)309     void forEach(std::function<void(const Stream *)>f) const {
310         for (size_t i = 0; i < mStreamPoolSize; ++i) {
311             f(&mStreamPool[i]);
312         }
313     }
314 
forEach(std::function<void (Stream *)> f)315     void forEach(std::function<void(Stream *)>f) {
316         for (size_t i = 0; i < mStreamPoolSize; ++i) {
317             f(&mStreamPool[i]);
318         }
319     }
320 
321     // Returns the pair stream for a given Stream.
322     // This need not be locked as it is a property of the pointer address.
getPairStream(const Stream * stream)323     Stream* getPairStream(const Stream* stream) const {
324         const size_t index = streamPosition(stream);
325         return &mStreamPool[index ^ 1];
326     }
327 
328     // find the position of the stream in mStreamPool array.
329     size_t streamPosition(const Stream* stream) const; // no lock needed
330 
getStreamMapSize()331     size_t getStreamMapSize() const {
332         return mStreamPoolSize;
333     }
334 
335     // find the next valid ID for a stream and store in hash table.
336     int32_t getNextIdForStream(Stream* stream) const;
337 
338 private:
339 
340     // use the hash table to attempt to find the stream.
341     // nullptr is returned if the lookup fails.
342     Stream* lookupStreamFromId(int32_t streamID) const;
343 
344     // The stream pool is initialized in the constructor, effectively const.
345     // no locking required for access.
346     //
347     // The constructor parameter "streams" results in streams pairs of streams.
348     // We have twice as many streams because we wish to return a streamID "handle"
349     // back to the app immediately, while we may be stopping the other stream in the
350     // pair to get its AudioTrack :-).
351     //
352     // Of the stream pair, only one of the streams may have an AudioTrack.
353     // The fixed association of a stream pair allows callbacks from the AudioTrack
354     // to be associated properly to either one or the other of the stream pair.
355     //
356     // TODO: The stream pair arrangement can be removed if we have better AudioTrack
357     // callback handling (being able to remove and change the callback after construction).
358     //
359     // Streams may be accessed anytime off of the stream pool
360     // as there is internal locking on each stream.
361     std::unique_ptr<Stream[]>   mStreamPool;        // no lock needed for access.
362     size_t                      mStreamPoolSize;    // no lock needed for access.
363 
364     // In order to find the Stream from a StreamID, we could do a linear lookup in mStreamPool.
365     // As an alternative, one could use stripe-locked or lock-free concurrent hashtables.
366     //
367     // When considering linear search vs hashmap, verify the typical use-case size.
368     // Linear search is faster than std::unordered_map (circa 2018) for less than 40 elements.
369     // [ Skarupke, M. (2018), "You Can Do Better than std::unordered_map: New and Recent
370     // Improvements to Hash Table Performance." C++Now 2018. cppnow.org, see
371     // https://www.youtube.com/watch?v=M2fKMP47slQ ]
372     //
373     // Here, we use a PerfectHash of Id to Stream *, since we can control the
374     // StreamID returned to the user.  This allows O(1) read access to mStreamPool lock-free.
375     //
376     // We prefer that the next stream ID is monotonic for aesthetic reasons
377     // (if we didn't care about monotonicity, a simple method is to apply a generation count
378     // to each stream in the unused upper bits of its index in mStreamPool for the id).
379     //
380     std::unique_ptr<PerfectHash<int32_t, Stream *>> mPerfectHash;
381 };
382 
383 /**
384  * StreamManager is used to manage the streams (accessed by StreamID from Java).
385  *
386  * Locking order (proceeds from application to component).
387  *  SoundPool mApiLock (if needed) -> StreamManager mStreamManagerLock
388  *                                 -> pair Stream mLock -> queued Stream mLock
389  */
390 class StreamManager : public StreamMap {
391 public:
392     // Note: the SoundPool pointer is only used for stream initialization.
393     // It is not stored in StreamManager.
394     StreamManager(int32_t streams, size_t threads, const audio_attributes_t& attributes,
395             std::string opPackageName);
396     ~StreamManager();
397 
398     // Returns positive streamID on success, 0 on failure.  This is locked.
399     int32_t queueForPlay(const std::shared_ptr<Sound> &sound,
400             int32_t soundID, float leftVolume, float rightVolume,
401             int32_t priority, int32_t loop, float rate, int32_t playerIId)
402             NO_THREAD_SAFETY_ANALYSIS; // uses unique_lock
403 
404     ///////////////////////////////////////////////////////////////////////
405     // Called from soundpool::Stream
406 
getAttributes()407     const audio_attributes_t* getAttributes() const { return &mAttributes; }
408 
getOpPackageName()409     const std::string& getOpPackageName() const { return mOpPackageName; }
410 
411     // Moves the stream to the restart queue (called upon BUFFER_END of the static track)
412     // this is locked internally.
413     // If activeStreamIDToMatch is nonzero, it will only move to the restart queue
414     // if the streamIDToMatch is found on the active queue.
415     void moveToRestartQueue(Stream* stream, int32_t activeStreamIDToMatch = 0);
416 
417 private:
418 
419     void run(int32_t id) NO_THREAD_SAFETY_ANALYSIS; // worker thread, takes unique_lock.
420     void dump() const;                           // no lock needed
421 
422     // returns true if more worker threads are needed.
needMoreThreads_l()423     bool needMoreThreads_l() REQUIRES(mStreamManagerLock) {
424         return mRestartStreams.size() > 0 &&
425                 (mThreadPool->getActiveThreadCount() == 0
426                 || std::distance(mRestartStreams.begin(),
427                         mRestartStreams.upper_bound(systemTime()))
428                         > (ptrdiff_t)mThreadPool->getActiveThreadCount());
429     }
430 
431     // returns true if the stream was added.
432     bool moveToRestartQueue_l(
433             Stream* stream, int32_t activeStreamIDToMatch = 0) REQUIRES(mStreamManagerLock);
434     // returns number of queues the stream was removed from (should be 0 or 1);
435     // a special code of -1 is returned if activeStreamIDToMatch is > 0 and
436     // the stream wasn't found on the active queue.
437     ssize_t removeFromQueues_l(
438             Stream* stream, int32_t activeStreamIDToMatch = 0) REQUIRES(mStreamManagerLock);
439     void addToRestartQueue_l(Stream *stream) REQUIRES(mStreamManagerLock);
440     void addToActiveQueue_l(Stream *stream) REQUIRES(mStreamManagerLock);
441     void sanityCheckQueue_l() const REQUIRES(mStreamManagerLock);
442 
443     const audio_attributes_t mAttributes;
444     const std::string mOpPackageName;
445 
446    // For legacy compatibility, we lock the stream manager on stop when
447    // there is only one stream.  This allows a play to be called immediately
448    // after stopping, otherwise it is possible that the play might be discarded
449    // (returns 0) because that stream may be in the worker thread call to stop.
450     const bool mLockStreamManagerStop;
451 
452     std::unique_ptr<ThreadPool> mThreadPool;                  // locked internally
453 
454     // mStreamManagerLock is used to lock access for transitions between the
455     // 4 stream queues by the Manager Thread or by the user initiated play().
456     // A stream pair has exactly one stream on exactly one of the queues.
457     std::mutex                  mStreamManagerLock;
458     std::condition_variable     mStreamManagerCondition GUARDED_BY(mStreamManagerLock);
459 
460     bool                        mQuit GUARDED_BY(mStreamManagerLock) = false;
461 
462     // There are constructor arg "streams" pairs of streams, only one of each
463     // pair on the 4 stream queues below.  The other stream in the pair serves as
464     // placeholder to accumulate user changes, pending actual availability of the
465     // AudioTrack, as it may be in use, requiring stop-then-restart.
466     //
467     // The 4 queues are implemented in the appropriate STL container based on perceived
468     // optimality.
469 
470     // 1) mRestartStreams: Streams awaiting stop.
471     // The paired stream may be active (but with no AudioTrack), and will be restarted
472     // with an active AudioTrack when the current stream is stopped.
473     std::multimap<int64_t /* stopTimeNs */, Stream*>
474                                 mRestartStreams GUARDED_BY(mStreamManagerLock);
475 
476     // 2) mActiveStreams: Streams that are active.
477     // The paired stream will be inactive.
478     // This is in order of specified by kStealActiveStream_OldestFirst
479     std::list<Stream*>          mActiveStreams GUARDED_BY(mStreamManagerLock);
480 
481     // 3) mAvailableStreams: Streams that are inactive.
482     // The paired stream will also be inactive.
483     // No particular order.
484     std::unordered_set<Stream*> mAvailableStreams GUARDED_BY(mStreamManagerLock);
485 
486     // 4) mProcessingStreams: Streams that are being processed by the ManagerThreads
487     // When on this queue, the stream and its pair are not available for stealing.
488     // Each ManagerThread will have at most one stream on the mProcessingStreams queue.
489     // The paired stream may be active or restarting.
490     // No particular order.
491     std::unordered_set<Stream*> mProcessingStreams GUARDED_BY(mStreamManagerLock);
492 };
493 
494 } // namespace android::soundpool
495