• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <cassert>
18 #include <iostream>
19 #include <limits>
20 #include <memory>
21 
22 #include "chre/platform/linux/task_util/task_manager.h"
23 
24 namespace chre {
25 
TaskManager()26 TaskManager::TaskManager()
27     : mQueue(std::greater<Task>()),
28       mCurrentTask(nullptr),
29       mContinueRunningThread(true),
30       mCurrentId(0) {
31   mThread = std::thread(&TaskManager::run, this);
32 }
33 
~TaskManager()34 TaskManager::~TaskManager() {
35   flushTasks();
36 
37   {
38     std::lock_guard<std::mutex> lock(mMutex);
39     mContinueRunningThread = false;
40     mConditionVariable.notify_all();
41   }
42 
43   if (mThread.joinable()) {
44     mThread.join();
45   }
46 }
47 
addTask(const Task::TaskFunction & func,std::chrono::milliseconds repeatInterval)48 std::optional<uint32_t> TaskManager::addTask(
49     const Task::TaskFunction &func, std::chrono::milliseconds repeatInterval) {
50   std::lock_guard<std::mutex> lock(mMutex);
51   bool success = false;
52 
53   uint32_t returnId;
54   if (!mContinueRunningThread) {
55     LOGW("Execution thread is shutting down. Cannot add a task.");
56   } else {
57     // select the next ID
58     assert(mCurrentId < std::numeric_limits<uint32_t>::max());
59     returnId = mCurrentId++;
60     Task task(func, repeatInterval, returnId);
61     success = mQueue.push(task);
62   }
63 
64   if (success) {
65     mConditionVariable.notify_all();
66     return returnId;
67   } else {
68     return std::optional<uint32_t>();
69   }
70 }
71 
cancelTask(uint32_t taskId)72 bool TaskManager::cancelTask(uint32_t taskId) {
73   std::lock_guard<std::mutex> lock(mMutex);
74 
75   bool success = false;
76   if (!mContinueRunningThread) {
77     LOGW("Execution thread is shutting down. Cannot cancel a task.");
78   } else if (mCurrentTask != nullptr && mCurrentTask->getId() == taskId) {
79     // The currently executing task may want to cancel itself.
80     mCurrentTask->cancel();
81     success = true;
82   } else {
83     for (auto iter = mQueue.begin(); iter != mQueue.end(); ++iter) {
84       if (iter->getId() == taskId) {
85         iter->cancel();
86         success = true;
87         break;
88       }
89     }
90   }
91 
92   return success;
93 }
94 
flushTasks()95 void TaskManager::flushTasks() {
96   std::lock_guard<std::mutex> lock(mMutex);
97   while (!mQueue.empty()) {
98     mQueue.pop();
99   }
100 }
101 
run()102 void TaskManager::run() {
103   while (true) {
104     Task task;
105     {
106       std::unique_lock<std::mutex> lock(mMutex);
107       mConditionVariable.wait(lock, [this]() {
108         return !mContinueRunningThread || !mQueue.empty();
109       });
110       if (!mContinueRunningThread) {
111         return;
112       }
113 
114       task = mQueue.top();
115       if (!task.isReadyToExecute()) {
116         auto waitTime =
117             task.getExecutionTimestamp() - std::chrono::steady_clock::now();
118         if (waitTime.count() > 0) {
119           mConditionVariable.wait_for(lock, waitTime);
120         }
121 
122         /**
123          * We continue here instead of executing the same task because we are
124          * not guaranteed that the condition variable was not spuriously woken
125          * up, and another task with a timestamp < the current task could have
126          * been added in the current time.
127          */
128         continue;
129       }
130 
131       mQueue.pop();
132       mCurrentTask = &task;
133     }
134     task.execute();
135     {
136       std::lock_guard<std::mutex> lock(mMutex);
137       mCurrentTask = nullptr;
138 
139       if (task.isRepeating() && !mQueue.push(task)) {
140         LOGE("TaskManager: Could not push task to priority queue");
141       }
142     }
143   }
144 }
145 
146 }  // namespace chre
147