1 /*!
2 * \copy
3 * Copyright (c) 2009-2015, Cisco Systems
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
21 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
22 * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
23 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
24 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 *
31 *
32 * \file WelsThreadPool.cpp
33 *
34 * \brief functions for Thread Pool
35 *
36 * \date 5/09/2012 Created
37 *
38 *************************************************************************************
39 */
40 #include "typedefs.h"
41 #include "memory_align.h"
42 #include "WelsThreadPool.h"
43
44 namespace WelsCommon {
45
46 namespace {
47
GetInitLock()48 CWelsLock& GetInitLock() {
49 static CWelsLock *initLock = new CWelsLock;
50 return *initLock;
51 }
52
53 }
54
55 int32_t CWelsThreadPool::m_iRefCount = 0;
56 int32_t CWelsThreadPool::m_iMaxThreadNum = DEFAULT_THREAD_NUM;
57 CWelsThreadPool* CWelsThreadPool::m_pThreadPoolSelf = NULL;
58
CWelsThreadPool()59 CWelsThreadPool::CWelsThreadPool() :
60 m_cWaitedTasks (NULL), m_cIdleThreads (NULL), m_cBusyThreads (NULL) {
61 }
62
63
~CWelsThreadPool()64 CWelsThreadPool::~CWelsThreadPool() {
65 //fprintf(stdout, "CWelsThreadPool::~CWelsThreadPool: delete %x, %x, %x\n", m_cWaitedTasks, m_cIdleThreads, m_cBusyThreads);
66 if (0 != m_iRefCount) {
67 m_iRefCount = 0;
68 Uninit();
69 }
70 }
71
SetThreadNum(int32_t iMaxThreadNum)72 WELS_THREAD_ERROR_CODE CWelsThreadPool::SetThreadNum (int32_t iMaxThreadNum) {
73 CWelsAutoLock cLock (GetInitLock());
74
75 if (m_iRefCount != 0) {
76 return WELS_THREAD_ERROR_GENERAL;
77 }
78
79 if (iMaxThreadNum <= 0) {
80 iMaxThreadNum = 1;
81 }
82 m_iMaxThreadNum = iMaxThreadNum;
83 return WELS_THREAD_ERROR_OK;
84 }
85
86
AddReference()87 CWelsThreadPool* CWelsThreadPool::AddReference() {
88 CWelsAutoLock cLock (GetInitLock());
89 if (m_pThreadPoolSelf == NULL) {
90 m_pThreadPoolSelf = new CWelsThreadPool();
91 if (!m_pThreadPoolSelf) {
92 return NULL;
93 }
94 }
95
96 if (m_iRefCount == 0) {
97 if (WELS_THREAD_ERROR_OK != m_pThreadPoolSelf->Init()) {
98 m_pThreadPoolSelf->Uninit();
99 delete m_pThreadPoolSelf;
100 m_pThreadPoolSelf = NULL;
101 return NULL;
102 }
103 }
104
105 ////fprintf(stdout, "m_iRefCount=%d, iMaxThreadNum=%d\n", m_iRefCount, m_iMaxThreadNum);
106
107 ++ m_iRefCount;
108 //fprintf(stdout, "m_iRefCount2=%d\n", m_iRefCount);
109 return m_pThreadPoolSelf;
110 }
111
RemoveInstance()112 void CWelsThreadPool::RemoveInstance() {
113 CWelsAutoLock cLock (GetInitLock());
114 //fprintf(stdout, "m_iRefCount=%d\n", m_iRefCount);
115 -- m_iRefCount;
116 if (0 == m_iRefCount) {
117 StopAllRunning();
118 Uninit();
119 if (m_pThreadPoolSelf) {
120 delete m_pThreadPoolSelf;
121 m_pThreadPoolSelf = NULL;
122 }
123 //fprintf(stdout, "m_iRefCount=%d, IdleThreadNum=%d, BusyThreadNum=%d, WaitedTask=%d\n", m_iRefCount, GetIdleThreadNum(), GetBusyThreadNum(), GetWaitedTaskNum());
124 }
125 }
126
127
IsReferenced()128 bool CWelsThreadPool::IsReferenced() {
129 CWelsAutoLock cLock (GetInitLock());
130 return (m_iRefCount > 0);
131 }
132
133
OnTaskStart(CWelsTaskThread * pThread,IWelsTask * pTask)134 WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStart (CWelsTaskThread* pThread, IWelsTask* pTask) {
135 AddThreadToBusyList (pThread);
136 //fprintf(stdout, "CWelsThreadPool::AddThreadToBusyList: Task %x at Thread %x\n", pTask, pThread);
137 return WELS_THREAD_ERROR_OK;
138 }
139
OnTaskStop(CWelsTaskThread * pThread,IWelsTask * pTask)140 WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStop (CWelsTaskThread* pThread, IWelsTask* pTask) {
141 //fprintf(stdout, "CWelsThreadPool::OnTaskStop 0: Task %x at Thread %x Finished\n", pTask, pThread);
142
143 RemoveThreadFromBusyList (pThread);
144 AddThreadToIdleQueue (pThread);
145
146 if (pTask && pTask->GetSink()) {
147 //fprintf(stdout, "CWelsThreadPool::OnTaskStop 1: Task %x at Thread %x Finished, m_pSink=%x\n", pTask, pThread, pTask->GetSink());
148 pTask->GetSink()->OnTaskExecuted();
149 ////fprintf(stdout, "CWelsThreadPool::OnTaskStop 1: Task %x at Thread %x Finished, m_pSink=%x\n", pTask, pThread, pTask->GetSink());
150 }
151 //if (m_pSink) {
152 // m_pSink->OnTaskExecuted (pTask);
153 //}
154 //fprintf(stdout, "CWelsThreadPool::OnTaskStop 2: Task %x at Thread %x Finished\n", pTask, pThread);
155
156 SignalThread();
157
158 //fprintf(stdout, "ThreadPool: Task %x at Thread %x Finished\n", pTask, pThread);
159 return WELS_THREAD_ERROR_OK;
160 }
161
Init()162 WELS_THREAD_ERROR_CODE CWelsThreadPool::Init() {
163 //fprintf(stdout, "Enter WelsThreadPool Init\n");
164
165 CWelsAutoLock cLock (m_cLockPool);
166
167 m_cWaitedTasks = new CWelsNonDuplicatedList<IWelsTask>();
168 m_cIdleThreads = new CWelsNonDuplicatedList<CWelsTaskThread>();
169 m_cBusyThreads = new CWelsList<CWelsTaskThread>();
170 if (NULL == m_cWaitedTasks || NULL == m_cIdleThreads || NULL == m_cBusyThreads) {
171 return WELS_THREAD_ERROR_GENERAL;
172 }
173
174 for (int32_t i = 0; i < m_iMaxThreadNum; i++) {
175 if (WELS_THREAD_ERROR_OK != CreateIdleThread()) {
176 return WELS_THREAD_ERROR_GENERAL;
177 }
178 }
179
180 if (WELS_THREAD_ERROR_OK != Start()) {
181 return WELS_THREAD_ERROR_GENERAL;
182 }
183
184 return WELS_THREAD_ERROR_OK;
185 }
186
StopAllRunning()187 WELS_THREAD_ERROR_CODE CWelsThreadPool::StopAllRunning() {
188 WELS_THREAD_ERROR_CODE iReturn = WELS_THREAD_ERROR_OK;
189
190 ClearWaitedTasks();
191
192 while (GetBusyThreadNum() > 0) {
193 //WELS_INFO_TRACE ("CWelsThreadPool::Uninit - Waiting all thread to exit");
194 WelsSleep (10);
195 }
196
197 if (GetIdleThreadNum() != m_iMaxThreadNum) {
198 iReturn = WELS_THREAD_ERROR_GENERAL;
199 }
200
201 return iReturn;
202 }
203
Uninit()204 WELS_THREAD_ERROR_CODE CWelsThreadPool::Uninit() {
205 WELS_THREAD_ERROR_CODE iReturn = WELS_THREAD_ERROR_OK;
206 CWelsAutoLock cLock (m_cLockPool);
207
208 iReturn = StopAllRunning();
209 if (WELS_THREAD_ERROR_OK != iReturn) {
210 return iReturn;
211 }
212
213 m_cLockIdleTasks.Lock();
214 while (m_cIdleThreads->size() > 0) {
215 DestroyThread (m_cIdleThreads->begin());
216 m_cIdleThreads->pop_front();
217 }
218 m_cLockIdleTasks.Unlock();
219
220 Kill();
221
222 WELS_DELETE_OP (m_cWaitedTasks);
223 WELS_DELETE_OP (m_cIdleThreads);
224 WELS_DELETE_OP (m_cBusyThreads);
225
226 return iReturn;
227 }
228
ExecuteTask()229 void CWelsThreadPool::ExecuteTask() {
230 //fprintf(stdout, "ThreadPool: scheduled tasks: ExecuteTask\n");
231 CWelsTaskThread* pThread = NULL;
232 IWelsTask* pTask = NULL;
233 while (GetWaitedTaskNum() > 0) {
234 //fprintf(stdout, "ThreadPool: ExecuteTask: waiting task %d\n", GetWaitedTaskNum());
235 pThread = GetIdleThread();
236 if (pThread == NULL) {
237 //fprintf(stdout, "ThreadPool: ExecuteTask: no IdleThread\n");
238
239 break;
240 }
241 pTask = GetWaitedTask();
242 //fprintf(stdout, "ThreadPool: ExecuteTask = %x at thread %x\n", pTask, pThread);
243 if (pTask) {
244 pThread->SetTask (pTask);
245 } else {
246 AddThreadToIdleQueue (pThread);
247 }
248 }
249 }
250
QueueTask(IWelsTask * pTask)251 WELS_THREAD_ERROR_CODE CWelsThreadPool::QueueTask (IWelsTask* pTask) {
252 CWelsAutoLock cLock (m_cLockPool);
253
254 //fprintf(stdout, "CWelsThreadPool::QueueTask: %d, pTask=%x\n", m_iRefCount, pTask);
255 if (GetWaitedTaskNum() == 0) {
256 CWelsTaskThread* pThread = GetIdleThread();
257
258 if (pThread != NULL) {
259 //fprintf(stdout, "ThreadPool: ExecuteTask = %x at thread %x\n", pTask, pThread);
260 pThread->SetTask (pTask);
261
262 return WELS_THREAD_ERROR_OK;
263 }
264 }
265 //fprintf(stdout, "ThreadPool: AddTaskToWaitedList: %x\n", pTask);
266 if (false == AddTaskToWaitedList (pTask)) {
267 return WELS_THREAD_ERROR_GENERAL;
268 }
269
270 //fprintf(stdout, "ThreadPool: SignalThread: %x\n", pTask);
271 SignalThread();
272 return WELS_THREAD_ERROR_OK;
273 }
274
CreateIdleThread()275 WELS_THREAD_ERROR_CODE CWelsThreadPool::CreateIdleThread() {
276 CWelsTaskThread* pThread = new CWelsTaskThread (this);
277
278 if (NULL == pThread) {
279 return WELS_THREAD_ERROR_GENERAL;
280 }
281
282 if (WELS_THREAD_ERROR_OK != pThread->Start()) {
283 return WELS_THREAD_ERROR_GENERAL;
284 }
285 //fprintf(stdout, "ThreadPool: AddThreadToIdleQueue: %x\n", pThread);
286 AddThreadToIdleQueue (pThread);
287
288 return WELS_THREAD_ERROR_OK;
289 }
290
DestroyThread(CWelsTaskThread * pThread)291 void CWelsThreadPool::DestroyThread (CWelsTaskThread* pThread) {
292 pThread->Kill();
293 WELS_DELETE_OP (pThread);
294
295 return;
296 }
297
AddThreadToIdleQueue(CWelsTaskThread * pThread)298 WELS_THREAD_ERROR_CODE CWelsThreadPool::AddThreadToIdleQueue (CWelsTaskThread* pThread) {
299 CWelsAutoLock cLock (m_cLockIdleTasks);
300 m_cIdleThreads->push_back (pThread);
301 return WELS_THREAD_ERROR_OK;
302 }
303
AddThreadToBusyList(CWelsTaskThread * pThread)304 WELS_THREAD_ERROR_CODE CWelsThreadPool::AddThreadToBusyList (CWelsTaskThread* pThread) {
305 CWelsAutoLock cLock (m_cLockBusyTasks);
306 m_cBusyThreads->push_back (pThread);
307 return WELS_THREAD_ERROR_OK;
308 }
309
RemoveThreadFromBusyList(CWelsTaskThread * pThread)310 WELS_THREAD_ERROR_CODE CWelsThreadPool::RemoveThreadFromBusyList (CWelsTaskThread* pThread) {
311 CWelsAutoLock cLock (m_cLockBusyTasks);
312 if (m_cBusyThreads->erase (pThread)) {
313 return WELS_THREAD_ERROR_OK;
314 } else {
315 return WELS_THREAD_ERROR_GENERAL;
316 }
317 }
318
AddTaskToWaitedList(IWelsTask * pTask)319 bool CWelsThreadPool::AddTaskToWaitedList (IWelsTask* pTask) {
320 CWelsAutoLock cLock (m_cLockWaitedTasks);
321
322 return m_cWaitedTasks->push_back (pTask);
323 }
324
GetIdleThread()325 CWelsTaskThread* CWelsThreadPool::GetIdleThread() {
326 CWelsAutoLock cLock (m_cLockIdleTasks);
327
328 if (NULL == m_cIdleThreads || m_cIdleThreads->size() == 0) {
329 return NULL;
330 }
331
332 //fprintf(stdout, "CWelsThreadPool::GetIdleThread=%d\n", m_cIdleThreads->size());
333
334 CWelsTaskThread* pThread = m_cIdleThreads->begin();
335 m_cIdleThreads->pop_front();
336 return pThread;
337 }
338
GetBusyThreadNum()339 int32_t CWelsThreadPool::GetBusyThreadNum() {
340 return (m_cBusyThreads?m_cBusyThreads->size():0);
341 }
342
GetIdleThreadNum()343 int32_t CWelsThreadPool::GetIdleThreadNum() {
344 return (m_cIdleThreads?m_cIdleThreads->size():0);
345 }
346
GetWaitedTaskNum()347 int32_t CWelsThreadPool::GetWaitedTaskNum() {
348 return (m_cWaitedTasks?m_cWaitedTasks->size():0);
349 }
350
GetWaitedTask()351 IWelsTask* CWelsThreadPool::GetWaitedTask() {
352 CWelsAutoLock lock (m_cLockWaitedTasks);
353
354 if (NULL==m_cWaitedTasks || m_cWaitedTasks->size() == 0) {
355 return NULL;
356 }
357
358 IWelsTask* pTask = m_cWaitedTasks->begin();
359
360 m_cWaitedTasks->pop_front();
361
362 return pTask;
363 }
364
ClearWaitedTasks()365 void CWelsThreadPool::ClearWaitedTasks() {
366 CWelsAutoLock cLock (m_cLockWaitedTasks);
367 if (NULL == m_cWaitedTasks) {
368 return;
369 }
370 IWelsTask* pTask = NULL;
371 while (0 != m_cWaitedTasks->size()) {
372 pTask = m_cWaitedTasks->begin();
373 if (pTask->GetSink()) {
374 pTask->GetSink()->OnTaskCancelled();
375 }
376 m_cWaitedTasks->pop_front();
377 }
378 }
379
380 }
381