1 /* 2 * Copyright (C) 2021 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <sched.h> 20 21 #include <condition_variable> 22 #include <mutex> 23 #include <thread> 24 25 template <typename Impl> 26 class StreamWorker { 27 enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED, ERROR }; 28 29 public: 30 StreamWorker() = default; ~StreamWorker()31 ~StreamWorker() { stop(); } start()32 bool start() { 33 mWorker = std::thread(&StreamWorker::workerThread, this); 34 std::unique_lock<std::mutex> lock(mWorkerLock); 35 mWorkerCv.wait(lock, [&] { return mWorkerState != WorkerState::STOPPED; }); 36 return mWorkerState == WorkerState::RUNNING; 37 } pause()38 void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); } resume()39 void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); } hasError()40 bool hasError() { 41 std::lock_guard<std::mutex> lock(mWorkerLock); 42 return mWorkerState == WorkerState::ERROR; 43 } stop()44 void stop() { 45 { 46 std::lock_guard<std::mutex> lock(mWorkerLock); 47 if (mWorkerState == WorkerState::STOPPED) return; 48 mWorkerState = WorkerState::STOPPED; 49 } 50 if (mWorker.joinable()) { 51 mWorker.join(); 52 } 53 } waitForAtLeastOneCycle()54 bool waitForAtLeastOneCycle() { 55 WorkerState newState; 56 switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState); 57 if (newState != WorkerState::PAUSED) return false; 58 switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState); 59 return newState == WorkerState::RUNNING; 60 } 61 62 // Methods that need to be provided by subclasses: 63 // 64 // Called once at the beginning of the thread loop. Must return 65 // 'true' to enter the thread loop, otherwise the thread loop 66 // exits and the worker switches into the 'error' state. 67 // bool workerInit(); 68 // 69 // Called for each thread loop unless the thread is in 'paused' state. 70 // Must return 'true' to continue running, otherwise the thread loop 71 // exits and the worker switches into the 'error' state. 72 // bool workerCycle(); 73 74 private: 75 void switchWorkerStateSync(WorkerState oldState, WorkerState newState, 76 WorkerState* finalState = nullptr) { 77 std::unique_lock<std::mutex> lock(mWorkerLock); 78 if (mWorkerState != oldState) { 79 if (finalState) *finalState = mWorkerState; 80 return; 81 } 82 mWorkerState = newState; 83 mWorkerCv.wait(lock, [&] { return mWorkerState != newState; }); 84 if (finalState) *finalState = mWorkerState; 85 } workerThread()86 void workerThread() { 87 bool success = static_cast<Impl*>(this)->workerInit(); 88 { 89 std::lock_guard<std::mutex> lock(mWorkerLock); 90 mWorkerState = success ? WorkerState::RUNNING : WorkerState::ERROR; 91 } 92 mWorkerCv.notify_one(); 93 if (!success) return; 94 95 for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) { 96 bool needToNotify = false; 97 if (state != WorkerState::PAUSED ? static_cast<Impl*>(this)->workerCycle() 98 : (sched_yield(), true)) { 99 // 100 // Pause and resume are synchronous. One worker cycle must complete 101 // before the worker indicates a state change. This is how 'mWorkerState' and 102 // 'state' interact: 103 // 104 // mWorkerState == RUNNING 105 // client sets mWorkerState := PAUSE_REQUESTED 106 // last workerCycle gets executed, state := mWorkerState := PAUSED by us 107 // (or the workers enters the 'error' state if workerCycle fails) 108 // client gets notified about state change in any case 109 // thread is doing a busy wait while 'state == PAUSED' 110 // client sets mWorkerState := RESUME_REQUESTED 111 // state := mWorkerState (RESUME_REQUESTED) 112 // mWorkerState := RUNNING, but we don't notify the client yet 113 // first workerCycle gets executed, the code below triggers a client notification 114 // (or if workerCycle fails, worker enters 'error' state and also notifies) 115 // state := mWorkerState (RUNNING) 116 if (state == WorkerState::RESUME_REQUESTED) { 117 needToNotify = true; 118 } 119 std::lock_guard<std::mutex> lock(mWorkerLock); 120 state = mWorkerState; 121 if (mWorkerState == WorkerState::PAUSE_REQUESTED) { 122 state = mWorkerState = WorkerState::PAUSED; 123 needToNotify = true; 124 } else if (mWorkerState == WorkerState::RESUME_REQUESTED) { 125 mWorkerState = WorkerState::RUNNING; 126 } 127 } else { 128 std::lock_guard<std::mutex> lock(mWorkerLock); 129 if (state == WorkerState::RESUME_REQUESTED || 130 mWorkerState == WorkerState::PAUSE_REQUESTED) { 131 needToNotify = true; 132 } 133 mWorkerState = WorkerState::ERROR; 134 state = WorkerState::STOPPED; 135 } 136 if (needToNotify) { 137 mWorkerCv.notify_one(); 138 } 139 } 140 } 141 142 std::thread mWorker; 143 std::mutex mWorkerLock; 144 std::condition_variable mWorkerCv; 145 WorkerState mWorkerState = WorkerState::STOPPED; // GUARDED_BY(mWorkerLock); 146 }; 147