• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2025 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <deque>
20 #include <mutex>
21 
22 #include "Runnable.h"
23 #include "jthread.h"
24 
25 namespace android::mediautils {
26 
27 /**
28  * A C++ implementation similar to a Java executor, which manages a thread which runs enqueued
29  * runnable tasks in queue order. Spawns thread on construction and joins destruction
30  */
31 class SingleThreadExecutor {
32   public:
SingleThreadExecutor()33     SingleThreadExecutor() : thread_([this](stop_token stok) { run(stok); }) {}
34 
~SingleThreadExecutor()35     ~SingleThreadExecutor() { shutdown(/* dropTasks= */ true); }
36 
enqueue(Runnable r)37     void enqueue(Runnable r) {
38         if (!r) {
39             return;
40         } else {
41             std::lock_guard l{mutex_};
42             if (thread_.stop_requested()) return;
43             task_list_.push_back(std::move(r));
44         }
45         cv_.notify_one();
46     }
47 
48     /**
49      * Request thread termination, optionally dropping any enqueued tasks.
50      * Note: does not join thread in this method and no task cancellation.
51      */
52     void shutdown(bool dropTasks = false) {
53         {
54             std::lock_guard l{mutex_};
55             if (thread_.stop_requested()) return;
56             if (dropTasks) {
57                 task_list_.clear();
58             }
59             thread_.request_stop();  // fancy atomic bool, so no deadlock risk
60         }
61         // This condition variable notification is necessary since the stop_callback functionality
62         // of stop_token is not fully implemented
63         cv_.notify_one();
64     }
65 
66 
67   private:
run(stop_token stok)68     void run(stop_token stok) {
69         std::unique_lock l{mutex_};
70         while (true) {
71             cv_.wait_for(l, std::chrono::seconds(3), [this, stok]() {
72                 return !task_list_.empty() || stok.stop_requested();
73             });
74             if (!task_list_.empty()) {
75                 Runnable r {std::move(task_list_.front())};
76                 task_list_.pop_front();
77                 l.unlock();
78                 r();
79                 l.lock();
80             } else if (stok.stop_requested()) {
81                 break;
82             } // else cv timeout
83         }
84     }
85 
86     std::condition_variable cv_;
87     std::mutex mutex_;
88     std::deque<Runnable> task_list_;
89     jthread thread_;
90 };
91 }  // namespace android::mediautils
92