• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "thread_pool.h"
16 #include <cstring>
17 #include "script_utils.h"
18 #include <unistd.h>
19 
20 namespace Uscript {
21 static thread_local float g_scriptProportion = 1.0f;
22 static ThreadPool* g_threadPool = nullptr;
23 static std::mutex g_initMutex;
24 static std::atomic<float> g_totalProportion(1.0f);
25 
SetScriptProportion(float proportion)26 void SetScriptProportion(float proportion)
27 {
28     g_scriptProportion = proportion;
29 }
30 
GetScriptProportion()31 float GetScriptProportion()
32 {
33     return g_scriptProportion;
34 }
35 
SetTotalProportion(float proportion)36 void SetTotalProportion(float proportion)
37 {
38     g_totalProportion.store(proportion);
39 }
40 
GetTotalProportion()41 float GetTotalProportion()
42 {
43     return g_totalProportion.load();
44 }
45 
CreateThreadPool(int number)46 ThreadPool* ThreadPool::CreateThreadPool(int number)
47 {
48     std::lock_guard<std::mutex> lock(g_initMutex);
49     if (g_threadPool != nullptr) {
50         return g_threadPool;
51     }
52     g_threadPool = new ThreadPool();
53     g_threadPool->Init(number);
54     return g_threadPool;
55 }
56 
Destroy()57 void ThreadPool::Destroy()
58 {
59     if (g_threadPool == nullptr) {
60         return;
61     }
62     std::lock_guard<std::mutex> lock(g_initMutex);
63     delete g_threadPool;
64     g_threadPool = nullptr;
65 }
66 
Init(int32_t number)67 void ThreadPool::Init(int32_t number)
68 {
69     threadNumber_ = number;
70     taskQueue_.resize(threadPoolMaxTasks);
71     for (size_t t = 0; t < taskQueue_.size(); ++t) {
72         taskQueue_[t].available = true;
73         for (int32_t i = 0; i < threadNumber_; ++i) {
74             taskQueue_[t].subTaskFlag.emplace_back(new std::atomic_bool { false });
75         }
76     }
77     // Create workers
78     for (int32_t threadIndex = 0; threadIndex < threadNumber_; ++threadIndex) {
79         workers_.emplace_back(std::thread([this, threadIndex] {
80             ThreadExecute(this, threadIndex);
81             }));
82     }
83 }
84 
ThreadRun(int32_t threadIndex)85 void ThreadPool::ThreadRun(int32_t threadIndex)
86 {
87     USCRIPT_LOGI("Create new thread successfully, tid: %d", gettid());
88     while (!stop_) {
89         for (int32_t k = 0; k < threadPoolMaxTasks; ++k) {
90             if (*taskQueue_[k].subTaskFlag[threadIndex]) {
91                 taskQueue_[k].task.processor(threadIndex);
92                 *taskQueue_[k].subTaskFlag[threadIndex] = false;
93             }
94         }
95         std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms
96     }
97 }
98 
~ThreadPool()99 ThreadPool::~ThreadPool()
100 {
101     {
102         std::lock_guard<std::mutex> lock(queueMutex_);
103         stop_ = true;
104     }
105     for (auto& worker : workers_) {
106         worker.join();
107     }
108     for (auto& task : taskQueue_) {
109         for (auto c : task.subTaskFlag) {
110             delete c;
111         }
112     }
113 }
114 
AddTask(Task && task)115 void ThreadPool::AddTask(Task &&task)
116 {
117     if (g_threadPool != nullptr) {
118         g_threadPool->AddNewTask(std::move(task));
119     }
120 }
121 
AddNewTask(Task && task)122 void ThreadPool::AddNewTask(Task &&task)
123 {
124     int32_t index = AcquireWorkIndex();
125     if (index < 0) {
126         USCRIPT_LOGI("ThreadPool::AddNewTask Failed");
127         return;
128     }
129 
130     RunTask(std::move(task), index);
131     // Works done. make this task available
132     std::lock_guard<std::mutex> lock(queueMutex_);
133     taskQueue_[index].available = true;
134 }
135 
AcquireWorkIndex()136 int32_t ThreadPool::AcquireWorkIndex()
137 {
138     std::lock_guard<std::mutex> lock(queueMutex_);
139     for (int32_t i = 0; i < threadPoolMaxTasks; ++i) {
140         if (taskQueue_[i].available) {
141             taskQueue_[i].available = false;
142             return i;
143         }
144     }
145     return -1;
146 }
147 
RunTask(Task && task,int32_t index)148 void ThreadPool::RunTask(Task &&task, int32_t index)
149 {
150     int32_t workSize = task.workSize;
151     taskQueue_[index].task = std::move(task);
152     // Mark each task should be executed
153     int32_t num = workSize > threadNumber_ ? threadNumber_ : workSize;
154     for (int32_t i = 0; i < num; ++i) {
155         *taskQueue_[index].subTaskFlag[i] = true;
156     }
157 
158     bool complete = true;
159     do {
160         std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms
161         complete = true;
162         // 检查是否所有子任务执行结束
163         for (int32_t i = 0; i < num; ++i) {
164             if (*taskQueue_[index].subTaskFlag[i]) {
165                 complete = false;
166                 break;
167             }
168         }
169     } while (!complete);
170 }
171 } // namespace Uscript
172