1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <cassert>
18 #include <iostream>
19 #include <limits>
20 #include <memory>
21
22 #include "chre/platform/linux/task_util/task_manager.h"
23
24 namespace chre {
25
TaskManager()26 TaskManager::TaskManager()
27 : mQueue(std::greater<Task>()),
28 mCurrentTask(nullptr),
29 mContinueRunningThread(true),
30 mCurrentId(0) {
31 mThread = std::thread(&TaskManager::run, this);
32 }
33
~TaskManager()34 TaskManager::~TaskManager() {
35 flushTasks();
36
37 {
38 std::lock_guard<std::mutex> lock(mMutex);
39 mContinueRunningThread = false;
40 mConditionVariable.notify_all();
41 }
42
43 if (mThread.joinable()) {
44 mThread.join();
45 }
46 }
47
addTask(const Task::TaskFunction & func,std::chrono::milliseconds repeatInterval)48 std::optional<uint32_t> TaskManager::addTask(
49 const Task::TaskFunction &func, std::chrono::milliseconds repeatInterval) {
50 std::lock_guard<std::mutex> lock(mMutex);
51 bool success = false;
52
53 uint32_t returnId;
54 if (!mContinueRunningThread) {
55 LOGW("Execution thread is shutting down. Cannot add a task.");
56 } else {
57 // select the next ID
58 assert(mCurrentId < std::numeric_limits<uint32_t>::max());
59 returnId = mCurrentId++;
60 Task task(func, repeatInterval, returnId);
61 success = mQueue.push(task);
62 }
63
64 if (success) {
65 mConditionVariable.notify_all();
66 return returnId;
67 } else {
68 return std::optional<uint32_t>();
69 }
70 }
71
cancelTask(uint32_t taskId)72 bool TaskManager::cancelTask(uint32_t taskId) {
73 std::lock_guard<std::mutex> lock(mMutex);
74
75 bool success = false;
76 if (!mContinueRunningThread) {
77 LOGW("Execution thread is shutting down. Cannot cancel a task.");
78 } else if (mCurrentTask != nullptr && mCurrentTask->getId() == taskId) {
79 // The currently executing task may want to cancel itself.
80 mCurrentTask->cancel();
81 success = true;
82 } else {
83 for (auto iter = mQueue.begin(); iter != mQueue.end(); ++iter) {
84 if (iter->getId() == taskId) {
85 iter->cancel();
86 success = true;
87 break;
88 }
89 }
90 }
91
92 return success;
93 }
94
flushTasks()95 void TaskManager::flushTasks() {
96 std::lock_guard<std::mutex> lock(mMutex);
97 while (!mQueue.empty()) {
98 mQueue.pop();
99 }
100 }
101
run()102 void TaskManager::run() {
103 while (true) {
104 Task task;
105 {
106 std::unique_lock<std::mutex> lock(mMutex);
107 mConditionVariable.wait(lock, [this]() {
108 return !mContinueRunningThread || !mQueue.empty();
109 });
110 if (!mContinueRunningThread) {
111 return;
112 }
113
114 task = mQueue.top();
115 if (!task.isReadyToExecute()) {
116 auto waitTime =
117 task.getExecutionTimestamp() - std::chrono::steady_clock::now();
118 if (waitTime.count() > 0) {
119 mConditionVariable.wait_for(lock, waitTime);
120 }
121
122 /**
123 * We continue here instead of executing the same task because we are
124 * not guaranteed that the condition variable was not spuriously woken
125 * up, and another task with a timestamp < the current task could have
126 * been added in the current time.
127 */
128 continue;
129 }
130
131 mQueue.pop();
132 mCurrentTask = &task;
133 }
134 task.execute();
135 {
136 std::lock_guard<std::mutex> lock(mMutex);
137 mCurrentTask = nullptr;
138
139 if (task.isRepeating() && !mQueue.push(task)) {
140 LOGE("TaskManager: Could not push task to priority queue");
141 }
142 }
143 }
144 }
145
146 } // namespace chre
147