1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include "Stream.h" 20 21 #include <condition_variable> 22 #include <future> 23 #include <list> 24 #include <map> 25 #include <memory> 26 #include <mutex> 27 #include <string> 28 #include <unordered_set> 29 #include <vector> 30 31 #include <utils/AndroidThreads.h> 32 33 namespace android::soundpool { 34 35 // TODO: Move helper classes to a utility file, with separate test. 36 37 /** 38 * JavaThread is used like std::thread but for threads that may call the JVM. 39 * 40 * std::thread does not easily attach to the JVM. We need JVM capable threads 41 * from createThreadEtc() since android binder call optimization may attempt to 42 * call back into Java if the SoundPool runs in system server. 43 * 44 * 45 * No locking is required - the member variables are inherently thread-safe. 46 */ 47 class JavaThread { 48 public: JavaThread(std::function<void ()> f,const char * name,int32_t threadPriority)49 JavaThread(std::function<void()> f, const char *name, int32_t threadPriority) 50 : mF{std::move(f)} { 51 createThreadEtc(staticFunction, this, name, threadPriority); 52 } 53 54 JavaThread(JavaThread &&) = delete; // uses "this" ptr, not moveable. 55 ~JavaThread()56 ~JavaThread() { 57 join(); // manually block until the future is ready as std::future 58 // destructor doesn't block unless it comes from std::async 59 // and it is the last reference to shared state. 60 } 61 join()62 void join() const { 63 mFuture.wait(); 64 } 65 isClosed()66 bool isClosed() const { 67 return mIsClosed; 68 } 69 70 private: staticFunction(void * data)71 static int staticFunction(void *data) { 72 JavaThread *jt = static_cast<JavaThread *>(data); 73 jt->mF(); 74 jt->mIsClosed = true; // set the flag that we are closed 75 // now before we allow the destructor to execute; 76 // otherwise there may be a use after free. 77 jt->mPromise.set_value(); 78 return 0; 79 } 80 81 // No locking is provided as these variables are initialized in the constructor 82 // and the members referenced are thread-safe objects. 83 // (mFuture.wait() can block multiple threads.) 84 // Note the order of member variables is reversed for destructor. 85 const std::function<void()> mF; 86 // Used in join() to block until the thread completes. 87 // See https://en.cppreference.com/w/cpp/thread/promise for the void specialization of 88 // promise. 89 std::promise<void> mPromise; 90 std::future<void> mFuture{mPromise.get_future()}; 91 std::atomic_bool mIsClosed = false; 92 }; 93 94 /** 95 * The ThreadPool manages thread lifetimes of SoundPool worker threads. 96 * 97 * TODO: the (eventual) goal of ThreadPool is to transparently and cooperatively 98 * maximize CPU utilization while avoiding starvation of other applications. 99 * Some possibilities: 100 * 101 * We should create worker threads when we have SoundPool work and the system is idle. 102 * CPU cycles are "use-it-or-lose-it" when the system is idle. 103 * 104 * We should adjust the priority of worker threads so that the second (and subsequent) worker 105 * threads have lower priority (should we try to promote priority also?). 106 * 107 * We should throttle the spawning of new worker threads, spacing over time, to avoid 108 * creating too many new threads all at once, on initialization. 109 */ 110 class ThreadPool { 111 public: 112 ThreadPool(size_t maxThreadCount, std::string name, 113 int32_t threadPriority = ANDROID_PRIORITY_NORMAL) mMaxThreadCount(maxThreadCount)114 : mMaxThreadCount(maxThreadCount) 115 , mName{std::move(name)} 116 , mThreadPriority(threadPriority) {} 117 ~ThreadPool()118 ~ThreadPool() { quit(); } 119 getActiveThreadCount()120 size_t getActiveThreadCount() const { return mActiveThreadCount; } getMaxThreadCount()121 size_t getMaxThreadCount() const { return mMaxThreadCount; } 122 quit()123 void quit() { 124 std::list<std::unique_ptr<JavaThread>> threads; 125 { 126 std::lock_guard lock(mThreadLock); 127 if (mQuit) return; // already joined. 128 mQuit = true; 129 threads = std::move(mThreads); 130 mThreads.clear(); 131 } 132 // mQuit set under lock, no more threads will be created. 133 for (auto &thread : threads) { 134 thread->join(); 135 thread.reset(); 136 } 137 LOG_ALWAYS_FATAL_IF(mActiveThreadCount != 0, 138 "Invalid Active Threads: %zu", (size_t)mActiveThreadCount); 139 } 140 141 // returns a non-zero id if successful, the id is to help logging messages. launch(std::function<void (int32_t)> f)142 int32_t launch(std::function<void(int32_t /* id */)> f) { 143 std::list<std::unique_ptr<JavaThread>> threadsToRelease; // release outside of lock. 144 std::lock_guard lock(mThreadLock); 145 if (mQuit) return 0; // ignore if we have quit 146 147 // clean up threads. 148 for (auto it = mThreads.begin(); it != mThreads.end(); ) { 149 if ((*it)->isClosed()) { 150 threadsToRelease.emplace_back(std::move(*it)); 151 it = mThreads.erase(it); 152 } else { 153 ++it; 154 } 155 } 156 157 const size_t threadCount = mThreads.size(); 158 if (threadCount < mMaxThreadCount) { 159 // if the id wraps, we don't care about collisions. it's just for logging. 160 mNextThreadId = mNextThreadId == INT32_MAX ? 1 : ++mNextThreadId; 161 const int32_t id = mNextThreadId; 162 mThreads.emplace_back(std::make_unique<JavaThread>( 163 [this, id, mf = std::move(f)] { mf(id); --mActiveThreadCount; }, 164 (mName + std::to_string(id)).c_str(), 165 mThreadPriority)); 166 ++mActiveThreadCount; 167 return id; 168 } 169 return 0; 170 } 171 172 // TODO: launch only if load average is low. 173 // This gets the load average 174 // See also std::thread::hardware_concurrency() for the concurrent capability. getLoadAvg()175 static double getLoadAvg() { 176 double loadAvg[1]; 177 if (getloadavg(loadAvg, std::size(loadAvg)) > 0) { 178 return loadAvg[0]; 179 } 180 return -1.; 181 } 182 183 private: 184 const size_t mMaxThreadCount; 185 const std::string mName; 186 const int32_t mThreadPriority; 187 188 std::atomic_size_t mActiveThreadCount = 0; 189 190 std::mutex mThreadLock; 191 bool mQuit GUARDED_BY(mThreadLock) = false; 192 int32_t mNextThreadId GUARDED_BY(mThreadLock) = 0; 193 std::list<std::unique_ptr<JavaThread>> mThreads GUARDED_BY(mThreadLock); 194 }; 195 196 /** 197 * A Perfect HashTable for IDs (key) to pointers (value). 198 * 199 * There are no collisions. Why? because we generate the IDs for you to look up :-). 200 * 201 * The goal of this hash table is to map an integer ID handle > 0 to a pointer. 202 * We give these IDs in monotonic order (though we may skip if it were to cause a collision). 203 * 204 * The size of the hashtable must be large enough to accommodate the max number of keys. 205 * We suggest 2x. 206 * 207 * Readers are lockless 208 * Single writer could be lockless, but we allow multiple writers through an internal lock. 209 * 210 * For the Key type K, valid keys generated are > 0 (signed or unsigned) 211 * For the Value type V, values are pointers - nullptr means empty. 212 */ 213 template <typename K, typename V> 214 class PerfectHash { 215 public: PerfectHash(size_t hashCapacity)216 PerfectHash(size_t hashCapacity) 217 : mHashCapacity(hashCapacity) 218 , mK2V{new std::atomic<V>[hashCapacity]()} { 219 } 220 221 // Generate a key for a value V. 222 // There is a testing function getKforV() which checks what the value reports as its key. 223 // 224 // Calls back into getKforV under lock. 225 // 226 // We expect that the hashCapacity is 2x the number of stored keys in order 227 // to have one or two tries to find an empty slot 228 K generateKey(V value, std::function<K(V)> getKforV, K oldKey = 0) { 229 std::lock_guard lock(mHashLock); 230 // try to remove the old key. 231 if (oldKey > 0) { // key valid 232 const V v = getValue(oldKey); 233 if (v != nullptr) { // value still valid 234 const K atPosition = getKforV(v); 235 if (atPosition < 0 || // invalid value 236 atPosition == oldKey || // value's key still valid and matches old key 237 ((atPosition ^ oldKey) & (mHashCapacity - 1)) != 0) { // stale key entry 238 getValue(oldKey) = nullptr; // invalidate 239 } 240 } // else if value is invalid, no need to invalidate. 241 } 242 // check if we are invalidating only. 243 if (value == nullptr) return 0; 244 // now insert the new value and return the key. 245 size_t tries = 0; 246 for (; tries < mHashCapacity; ++tries) { 247 mNextKey = mNextKey == std::numeric_limits<K>::max() ? 1 : mNextKey + 1; 248 const V v = getValue(mNextKey); 249 //ALOGD("tries: %zu, key:%d value:%p", tries, (int)mNextKey, v); 250 if (v == nullptr) break; // empty 251 const K atPosition = getKforV(v); 252 //ALOGD("tries: %zu key atPosition:%d", tries, (int)atPosition); 253 if (atPosition < 0 || // invalid value 254 ((atPosition ^ mNextKey) & (mHashCapacity - 1)) != 0) { // stale key entry 255 break; 256 } 257 } 258 LOG_ALWAYS_FATAL_IF(tries == mHashCapacity, "hash table overflow!"); 259 //ALOGD("%s: found after %zu tries", __func__, tries); 260 getValue(mNextKey) = value; 261 return mNextKey; 262 } 263 getValue(K key)264 std::atomic<V> &getValue(K key) { return mK2V[key & (mHashCapacity - 1)]; } getValue(K key)265 const std::atomic_int32_t &getValue(K key) const { return mK2V[key & (mHashCapacity - 1)]; } 266 267 private: 268 mutable std::mutex mHashLock; 269 const size_t mHashCapacity; // size of mK2V no lock needed. 270 std::unique_ptr<std::atomic<V>[]> mK2V; // no lock needed for read access. GUARDED_BY(mHashLock)271 K mNextKey GUARDED_BY(mHashLock) {}; 272 }; 273 274 /** 275 * StreamMap contains the all the valid streams available to SoundPool. 276 * 277 * There is no Lock required for this class because the streams are 278 * allocated in the constructor, the lookup is lockless, and the Streams 279 * returned are locked internally. 280 * 281 * The lookup uses a perfect hash. 282 * It is possible to use a lockless hash table or to use a stripe-locked concurrent 283 * hashmap for essentially lock-free lookup. 284 * 285 * This follows Map-Reduce parallelism model. 286 * https://en.wikipedia.org/wiki/MapReduce 287 * 288 * Conceivably the forEach could be parallelized using std::for_each with a 289 * std::execution::par policy. 290 * 291 * https://en.cppreference.com/w/cpp/algorithm/for_each 292 */ 293 class StreamMap { 294 public: 295 explicit StreamMap(int32_t streams); 296 297 // Returns the stream associated with streamID or nullptr if not found. 298 // This need not be locked. 299 // The stream ID will never migrate to another Stream, but it may change 300 // underneath you. The Stream operations that take a streamID will confirm 301 // that the streamID matches under the Stream lock before executing otherwise 302 // it ignores the command as stale. 303 Stream* findStream(int32_t streamID) const; 304 305 // Iterates through the stream pool applying the function f. 306 // Since this enumerates over every single stream, it is unlocked. 307 // 308 // See related: https://en.cppreference.com/w/cpp/algorithm/for_each forEach(std::function<void (const Stream *)> f)309 void forEach(std::function<void(const Stream *)>f) const { 310 for (size_t i = 0; i < mStreamPoolSize; ++i) { 311 f(&mStreamPool[i]); 312 } 313 } 314 forEach(std::function<void (Stream *)> f)315 void forEach(std::function<void(Stream *)>f) { 316 for (size_t i = 0; i < mStreamPoolSize; ++i) { 317 f(&mStreamPool[i]); 318 } 319 } 320 321 // Returns the pair stream for a given Stream. 322 // This need not be locked as it is a property of the pointer address. getPairStream(const Stream * stream)323 Stream* getPairStream(const Stream* stream) const { 324 const size_t index = streamPosition(stream); 325 return &mStreamPool[index ^ 1]; 326 } 327 328 // find the position of the stream in mStreamPool array. 329 size_t streamPosition(const Stream* stream) const; // no lock needed 330 getStreamMapSize()331 size_t getStreamMapSize() const { 332 return mStreamPoolSize; 333 } 334 335 // find the next valid ID for a stream and store in hash table. 336 int32_t getNextIdForStream(Stream* stream) const; 337 338 private: 339 340 // use the hash table to attempt to find the stream. 341 // nullptr is returned if the lookup fails. 342 Stream* lookupStreamFromId(int32_t streamID) const; 343 344 // The stream pool is initialized in the constructor, effectively const. 345 // no locking required for access. 346 // 347 // The constructor parameter "streams" results in streams pairs of streams. 348 // We have twice as many streams because we wish to return a streamID "handle" 349 // back to the app immediately, while we may be stopping the other stream in the 350 // pair to get its AudioTrack :-). 351 // 352 // Of the stream pair, only one of the streams may have an AudioTrack. 353 // The fixed association of a stream pair allows callbacks from the AudioTrack 354 // to be associated properly to either one or the other of the stream pair. 355 // 356 // TODO: The stream pair arrangement can be removed if we have better AudioTrack 357 // callback handling (being able to remove and change the callback after construction). 358 // 359 // Streams may be accessed anytime off of the stream pool 360 // as there is internal locking on each stream. 361 std::unique_ptr<Stream[]> mStreamPool; // no lock needed for access. 362 size_t mStreamPoolSize; // no lock needed for access. 363 364 // In order to find the Stream from a StreamID, we could do a linear lookup in mStreamPool. 365 // As an alternative, one could use stripe-locked or lock-free concurrent hashtables. 366 // 367 // When considering linear search vs hashmap, verify the typical use-case size. 368 // Linear search is faster than std::unordered_map (circa 2018) for less than 40 elements. 369 // [ Skarupke, M. (2018), "You Can Do Better than std::unordered_map: New and Recent 370 // Improvements to Hash Table Performance." C++Now 2018. cppnow.org, see 371 // https://www.youtube.com/watch?v=M2fKMP47slQ ] 372 // 373 // Here, we use a PerfectHash of Id to Stream *, since we can control the 374 // StreamID returned to the user. This allows O(1) read access to mStreamPool lock-free. 375 // 376 // We prefer that the next stream ID is monotonic for aesthetic reasons 377 // (if we didn't care about monotonicity, a simple method is to apply a generation count 378 // to each stream in the unused upper bits of its index in mStreamPool for the id). 379 // 380 std::unique_ptr<PerfectHash<int32_t, Stream *>> mPerfectHash; 381 }; 382 383 /** 384 * StreamManager is used to manage the streams (accessed by StreamID from Java). 385 * 386 * Locking order (proceeds from application to component). 387 * SoundPool mApiLock (if needed) -> StreamManager mStreamManagerLock 388 * -> pair Stream mLock -> queued Stream mLock 389 */ 390 class StreamManager : public StreamMap { 391 public: 392 // Note: the SoundPool pointer is only used for stream initialization. 393 // It is not stored in StreamManager. 394 StreamManager(int32_t streams, size_t threads, const audio_attributes_t& attributes, 395 std::string opPackageName); 396 ~StreamManager(); 397 398 // Returns positive streamID on success, 0 on failure. This is locked. 399 int32_t queueForPlay(const std::shared_ptr<Sound> &sound, 400 int32_t soundID, float leftVolume, float rightVolume, 401 int32_t priority, int32_t loop, float rate, int32_t playerIId) 402 NO_THREAD_SAFETY_ANALYSIS; // uses unique_lock 403 404 /////////////////////////////////////////////////////////////////////// 405 // Called from soundpool::Stream 406 getAttributes()407 const audio_attributes_t* getAttributes() const { return &mAttributes; } 408 getOpPackageName()409 const std::string& getOpPackageName() const { return mOpPackageName; } 410 411 // Moves the stream to the restart queue (called upon BUFFER_END of the static track) 412 // this is locked internally. 413 // If activeStreamIDToMatch is nonzero, it will only move to the restart queue 414 // if the streamIDToMatch is found on the active queue. 415 void moveToRestartQueue(Stream* stream, int32_t activeStreamIDToMatch = 0); 416 417 private: 418 419 void run(int32_t id) NO_THREAD_SAFETY_ANALYSIS; // worker thread, takes unique_lock. 420 void dump() const; // no lock needed 421 422 // returns true if more worker threads are needed. needMoreThreads_l()423 bool needMoreThreads_l() REQUIRES(mStreamManagerLock) { 424 return mRestartStreams.size() > 0 && 425 (mThreadPool->getActiveThreadCount() == 0 426 || std::distance(mRestartStreams.begin(), 427 mRestartStreams.upper_bound(systemTime())) 428 > (ptrdiff_t)mThreadPool->getActiveThreadCount()); 429 } 430 431 // returns true if the stream was added. 432 bool moveToRestartQueue_l( 433 Stream* stream, int32_t activeStreamIDToMatch = 0) REQUIRES(mStreamManagerLock); 434 // returns number of queues the stream was removed from (should be 0 or 1); 435 // a special code of -1 is returned if activeStreamIDToMatch is > 0 and 436 // the stream wasn't found on the active queue. 437 ssize_t removeFromQueues_l( 438 Stream* stream, int32_t activeStreamIDToMatch = 0) REQUIRES(mStreamManagerLock); 439 void addToRestartQueue_l(Stream *stream) REQUIRES(mStreamManagerLock); 440 void addToActiveQueue_l(Stream *stream) REQUIRES(mStreamManagerLock); 441 void sanityCheckQueue_l() const REQUIRES(mStreamManagerLock); 442 443 const audio_attributes_t mAttributes; 444 const std::string mOpPackageName; 445 446 // For legacy compatibility, we lock the stream manager on stop when 447 // there is only one stream. This allows a play to be called immediately 448 // after stopping, otherwise it is possible that the play might be discarded 449 // (returns 0) because that stream may be in the worker thread call to stop. 450 const bool mLockStreamManagerStop; 451 452 std::unique_ptr<ThreadPool> mThreadPool; // locked internally 453 454 // mStreamManagerLock is used to lock access for transitions between the 455 // 4 stream queues by the Manager Thread or by the user initiated play(). 456 // A stream pair has exactly one stream on exactly one of the queues. 457 std::mutex mStreamManagerLock; 458 std::condition_variable mStreamManagerCondition GUARDED_BY(mStreamManagerLock); 459 460 bool mQuit GUARDED_BY(mStreamManagerLock) = false; 461 462 // There are constructor arg "streams" pairs of streams, only one of each 463 // pair on the 4 stream queues below. The other stream in the pair serves as 464 // placeholder to accumulate user changes, pending actual availability of the 465 // AudioTrack, as it may be in use, requiring stop-then-restart. 466 // 467 // The 4 queues are implemented in the appropriate STL container based on perceived 468 // optimality. 469 470 // 1) mRestartStreams: Streams awaiting stop. 471 // The paired stream may be active (but with no AudioTrack), and will be restarted 472 // with an active AudioTrack when the current stream is stopped. 473 std::multimap<int64_t /* stopTimeNs */, Stream*> 474 mRestartStreams GUARDED_BY(mStreamManagerLock); 475 476 // 2) mActiveStreams: Streams that are active. 477 // The paired stream will be inactive. 478 // This is in order of specified by kStealActiveStream_OldestFirst 479 std::list<Stream*> mActiveStreams GUARDED_BY(mStreamManagerLock); 480 481 // 3) mAvailableStreams: Streams that are inactive. 482 // The paired stream will also be inactive. 483 // No particular order. 484 std::unordered_set<Stream*> mAvailableStreams GUARDED_BY(mStreamManagerLock); 485 486 // 4) mProcessingStreams: Streams that are being processed by the ManagerThreads 487 // When on this queue, the stream and its pair are not available for stealing. 488 // Each ManagerThread will have at most one stream on the mProcessingStreams queue. 489 // The paired stream may be active or restarting. 490 // No particular order. 491 std::unordered_set<Stream*> mProcessingStreams GUARDED_BY(mStreamManagerLock); 492 }; 493 494 } // namespace android::soundpool 495