1 /* 2 * Copyright (C) 2025 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <deque> 20 #include <mutex> 21 22 #include "Runnable.h" 23 #include "jthread.h" 24 25 namespace android::mediautils { 26 27 /** 28 * A C++ implementation similar to a Java executor, which manages a thread which runs enqueued 29 * runnable tasks in queue order. Spawns thread on construction and joins destruction 30 */ 31 class SingleThreadExecutor { 32 public: SingleThreadExecutor()33 SingleThreadExecutor() : thread_([this](stop_token stok) { run(stok); }) {} 34 ~SingleThreadExecutor()35 ~SingleThreadExecutor() { shutdown(/* dropTasks= */ true); } 36 enqueue(Runnable r)37 void enqueue(Runnable r) { 38 if (!r) { 39 return; 40 } else { 41 std::lock_guard l{mutex_}; 42 if (thread_.stop_requested()) return; 43 task_list_.push_back(std::move(r)); 44 } 45 cv_.notify_one(); 46 } 47 48 /** 49 * Request thread termination, optionally dropping any enqueued tasks. 50 * Note: does not join thread in this method and no task cancellation. 51 */ 52 void shutdown(bool dropTasks = false) { 53 { 54 std::lock_guard l{mutex_}; 55 if (thread_.stop_requested()) return; 56 if (dropTasks) { 57 task_list_.clear(); 58 } 59 thread_.request_stop(); // fancy atomic bool, so no deadlock risk 60 } 61 // This condition variable notification is necessary since the stop_callback functionality 62 // of stop_token is not fully implemented 63 cv_.notify_one(); 64 } 65 66 67 private: run(stop_token stok)68 void run(stop_token stok) { 69 std::unique_lock l{mutex_}; 70 while (true) { 71 cv_.wait_for(l, std::chrono::seconds(3), [this, stok]() { 72 return !task_list_.empty() || stok.stop_requested(); 73 }); 74 if (!task_list_.empty()) { 75 Runnable r {std::move(task_list_.front())}; 76 task_list_.pop_front(); 77 l.unlock(); 78 r(); 79 l.lock(); 80 } else if (stok.stop_requested()) { 81 break; 82 } // else cv timeout 83 } 84 } 85 86 std::condition_variable cv_; 87 std::mutex mutex_; 88 std::deque<Runnable> task_list_; 89 jthread thread_; 90 }; 91 } // namespace android::mediautils 92