• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*!
2  * \copy
3  *     Copyright (c)  2009-2015, Cisco Systems
4  *     All rights reserved.
5  *
6  *     Redistribution and use in source and binary forms, with or without
7  *     modification, are permitted provided that the following conditions
8  *     are met:
9  *
10  *        * Redistributions of source code must retain the above copyright
11  *          notice, this list of conditions and the following disclaimer.
12  *
13  *        * Redistributions in binary form must reproduce the above copyright
14  *          notice, this list of conditions and the following disclaimer in
15  *          the documentation and/or other materials provided with the
16  *          distribution.
17  *
18  *     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  *     "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  *     LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
21  *     FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
22  *     COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
23  *     INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
24  *     BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25  *     LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26  *     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27  *     LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28  *     ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  *     POSSIBILITY OF SUCH DAMAGE.
30  *
31  *
32  * \file    WelsThreadPool.cpp
33  *
34  * \brief   functions for Thread Pool
35  *
36  * \date    5/09/2012 Created
37  *
38  *************************************************************************************
39  */
40 #include "typedefs.h"
41 #include "memory_align.h"
42 #include "WelsThreadPool.h"
43 
44 namespace WelsCommon {
45 
46 namespace {
47 
GetInitLock()48 CWelsLock& GetInitLock() {
49   static CWelsLock *initLock = new CWelsLock;
50   return *initLock;
51 }
52 
53 }
54 
55 int32_t CWelsThreadPool::m_iRefCount = 0;
56 int32_t CWelsThreadPool::m_iMaxThreadNum = DEFAULT_THREAD_NUM;
57 CWelsThreadPool* CWelsThreadPool::m_pThreadPoolSelf = NULL;
58 
CWelsThreadPool()59 CWelsThreadPool::CWelsThreadPool() :
60   m_cWaitedTasks (NULL), m_cIdleThreads (NULL), m_cBusyThreads (NULL) {
61 }
62 
63 
~CWelsThreadPool()64 CWelsThreadPool::~CWelsThreadPool() {
65   //fprintf(stdout, "CWelsThreadPool::~CWelsThreadPool: delete %x, %x, %x\n", m_cWaitedTasks, m_cIdleThreads, m_cBusyThreads);
66   if (0 != m_iRefCount) {
67     m_iRefCount = 0;
68     Uninit();
69   }
70 }
71 
SetThreadNum(int32_t iMaxThreadNum)72 WELS_THREAD_ERROR_CODE CWelsThreadPool::SetThreadNum (int32_t iMaxThreadNum) {
73   CWelsAutoLock  cLock (GetInitLock());
74 
75   if (m_iRefCount != 0) {
76     return WELS_THREAD_ERROR_GENERAL;
77   }
78 
79   if (iMaxThreadNum <= 0) {
80     iMaxThreadNum = 1;
81   }
82   m_iMaxThreadNum = iMaxThreadNum;
83   return WELS_THREAD_ERROR_OK;
84 }
85 
86 
AddReference()87 CWelsThreadPool* CWelsThreadPool::AddReference() {
88   CWelsAutoLock  cLock (GetInitLock());
89   if (m_pThreadPoolSelf == NULL) {
90     m_pThreadPoolSelf = new CWelsThreadPool();
91     if (!m_pThreadPoolSelf) {
92       return NULL;
93     }
94   }
95 
96   if (m_iRefCount == 0) {
97     if (WELS_THREAD_ERROR_OK != m_pThreadPoolSelf->Init()) {
98       m_pThreadPoolSelf->Uninit();
99       delete m_pThreadPoolSelf;
100       m_pThreadPoolSelf = NULL;
101       return NULL;
102     }
103   }
104 
105   ////fprintf(stdout, "m_iRefCount=%d, iMaxThreadNum=%d\n", m_iRefCount, m_iMaxThreadNum);
106 
107   ++ m_iRefCount;
108   //fprintf(stdout, "m_iRefCount2=%d\n", m_iRefCount);
109   return m_pThreadPoolSelf;
110 }
111 
RemoveInstance()112 void CWelsThreadPool::RemoveInstance() {
113   CWelsAutoLock  cLock (GetInitLock());
114   //fprintf(stdout, "m_iRefCount=%d\n", m_iRefCount);
115   -- m_iRefCount;
116   if (0 == m_iRefCount) {
117     StopAllRunning();
118     Uninit();
119     if (m_pThreadPoolSelf) {
120       delete m_pThreadPoolSelf;
121       m_pThreadPoolSelf = NULL;
122     }
123     //fprintf(stdout, "m_iRefCount=%d, IdleThreadNum=%d, BusyThreadNum=%d, WaitedTask=%d\n", m_iRefCount, GetIdleThreadNum(), GetBusyThreadNum(), GetWaitedTaskNum());
124   }
125 }
126 
127 
IsReferenced()128 bool CWelsThreadPool::IsReferenced() {
129   CWelsAutoLock  cLock (GetInitLock());
130   return (m_iRefCount > 0);
131 }
132 
133 
OnTaskStart(CWelsTaskThread * pThread,IWelsTask * pTask)134 WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStart (CWelsTaskThread* pThread, IWelsTask* pTask) {
135   AddThreadToBusyList (pThread);
136   //fprintf(stdout, "CWelsThreadPool::AddThreadToBusyList: Task %x at Thread %x\n", pTask, pThread);
137   return WELS_THREAD_ERROR_OK;
138 }
139 
OnTaskStop(CWelsTaskThread * pThread,IWelsTask * pTask)140 WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStop (CWelsTaskThread* pThread, IWelsTask* pTask) {
141   //fprintf(stdout, "CWelsThreadPool::OnTaskStop 0: Task %x at Thread %x Finished\n", pTask, pThread);
142 
143   RemoveThreadFromBusyList (pThread);
144   AddThreadToIdleQueue (pThread);
145 
146   if (pTask && pTask->GetSink()) {
147     //fprintf(stdout, "CWelsThreadPool::OnTaskStop 1: Task %x at Thread %x Finished, m_pSink=%x\n", pTask, pThread, pTask->GetSink());
148     pTask->GetSink()->OnTaskExecuted();
149     ////fprintf(stdout, "CWelsThreadPool::OnTaskStop 1: Task %x at Thread %x Finished, m_pSink=%x\n", pTask, pThread, pTask->GetSink());
150   }
151   //if (m_pSink) {
152   //  m_pSink->OnTaskExecuted (pTask);
153   //}
154   //fprintf(stdout, "CWelsThreadPool::OnTaskStop 2: Task %x at Thread %x Finished\n", pTask, pThread);
155 
156   SignalThread();
157 
158   //fprintf(stdout, "ThreadPool: Task %x at Thread %x Finished\n", pTask, pThread);
159   return WELS_THREAD_ERROR_OK;
160 }
161 
Init()162 WELS_THREAD_ERROR_CODE CWelsThreadPool::Init() {
163   //fprintf(stdout, "Enter WelsThreadPool Init\n");
164 
165   CWelsAutoLock  cLock (m_cLockPool);
166 
167   m_cWaitedTasks = new CWelsNonDuplicatedList<IWelsTask>();
168   m_cIdleThreads = new CWelsNonDuplicatedList<CWelsTaskThread>();
169   m_cBusyThreads = new CWelsList<CWelsTaskThread>();
170   if (NULL == m_cWaitedTasks || NULL == m_cIdleThreads || NULL == m_cBusyThreads) {
171     return WELS_THREAD_ERROR_GENERAL;
172   }
173 
174   for (int32_t i = 0; i < m_iMaxThreadNum; i++) {
175     if (WELS_THREAD_ERROR_OK != CreateIdleThread()) {
176       return WELS_THREAD_ERROR_GENERAL;
177     }
178   }
179 
180   if (WELS_THREAD_ERROR_OK != Start()) {
181     return WELS_THREAD_ERROR_GENERAL;
182   }
183 
184   return WELS_THREAD_ERROR_OK;
185 }
186 
StopAllRunning()187 WELS_THREAD_ERROR_CODE CWelsThreadPool::StopAllRunning() {
188   WELS_THREAD_ERROR_CODE iReturn = WELS_THREAD_ERROR_OK;
189 
190   ClearWaitedTasks();
191 
192   while (GetBusyThreadNum() > 0) {
193     //WELS_INFO_TRACE ("CWelsThreadPool::Uninit - Waiting all thread to exit");
194     WelsSleep (10);
195   }
196 
197   if (GetIdleThreadNum() != m_iMaxThreadNum) {
198     iReturn = WELS_THREAD_ERROR_GENERAL;
199   }
200 
201   return iReturn;
202 }
203 
Uninit()204 WELS_THREAD_ERROR_CODE CWelsThreadPool::Uninit() {
205   WELS_THREAD_ERROR_CODE iReturn = WELS_THREAD_ERROR_OK;
206   CWelsAutoLock  cLock (m_cLockPool);
207 
208   iReturn = StopAllRunning();
209   if (WELS_THREAD_ERROR_OK != iReturn) {
210     return iReturn;
211   }
212 
213   m_cLockIdleTasks.Lock();
214   while (m_cIdleThreads->size() > 0) {
215     DestroyThread (m_cIdleThreads->begin());
216     m_cIdleThreads->pop_front();
217   }
218   m_cLockIdleTasks.Unlock();
219 
220   Kill();
221 
222   WELS_DELETE_OP (m_cWaitedTasks);
223   WELS_DELETE_OP (m_cIdleThreads);
224   WELS_DELETE_OP (m_cBusyThreads);
225 
226   return iReturn;
227 }
228 
ExecuteTask()229 void CWelsThreadPool::ExecuteTask() {
230   //fprintf(stdout, "ThreadPool: scheduled tasks: ExecuteTask\n");
231   CWelsTaskThread* pThread = NULL;
232   IWelsTask*    pTask = NULL;
233   while (GetWaitedTaskNum() > 0) {
234     //fprintf(stdout, "ThreadPool:  ExecuteTask: waiting task %d\n", GetWaitedTaskNum());
235     pThread = GetIdleThread();
236     if (pThread == NULL) {
237       //fprintf(stdout, "ThreadPool:  ExecuteTask: no IdleThread\n");
238 
239       break;
240     }
241     pTask = GetWaitedTask();
242     //fprintf(stdout, "ThreadPool:  ExecuteTask = %x at thread %x\n", pTask, pThread);
243     if (pTask) {
244       pThread->SetTask (pTask);
245     } else {
246       AddThreadToIdleQueue (pThread);
247     }
248   }
249 }
250 
QueueTask(IWelsTask * pTask)251 WELS_THREAD_ERROR_CODE CWelsThreadPool::QueueTask (IWelsTask* pTask) {
252   CWelsAutoLock  cLock (m_cLockPool);
253 
254   //fprintf(stdout, "CWelsThreadPool::QueueTask: %d, pTask=%x\n", m_iRefCount, pTask);
255   if (GetWaitedTaskNum() == 0) {
256     CWelsTaskThread* pThread = GetIdleThread();
257 
258     if (pThread != NULL) {
259       //fprintf(stdout, "ThreadPool:  ExecuteTask = %x at thread %x\n", pTask, pThread);
260       pThread->SetTask (pTask);
261 
262       return WELS_THREAD_ERROR_OK;
263     }
264   }
265   //fprintf(stdout, "ThreadPool:  AddTaskToWaitedList: %x\n", pTask);
266   if (false == AddTaskToWaitedList (pTask)) {
267     return WELS_THREAD_ERROR_GENERAL;
268   }
269 
270   //fprintf(stdout, "ThreadPool:  SignalThread: %x\n", pTask);
271   SignalThread();
272   return WELS_THREAD_ERROR_OK;
273 }
274 
CreateIdleThread()275 WELS_THREAD_ERROR_CODE CWelsThreadPool::CreateIdleThread() {
276   CWelsTaskThread* pThread = new CWelsTaskThread (this);
277 
278   if (NULL == pThread) {
279     return WELS_THREAD_ERROR_GENERAL;
280   }
281 
282   if (WELS_THREAD_ERROR_OK != pThread->Start()) {
283     return WELS_THREAD_ERROR_GENERAL;
284   }
285   //fprintf(stdout, "ThreadPool:  AddThreadToIdleQueue: %x\n", pThread);
286   AddThreadToIdleQueue (pThread);
287 
288   return WELS_THREAD_ERROR_OK;
289 }
290 
DestroyThread(CWelsTaskThread * pThread)291 void  CWelsThreadPool::DestroyThread (CWelsTaskThread* pThread) {
292   pThread->Kill();
293   WELS_DELETE_OP (pThread);
294 
295   return;
296 }
297 
AddThreadToIdleQueue(CWelsTaskThread * pThread)298 WELS_THREAD_ERROR_CODE CWelsThreadPool::AddThreadToIdleQueue (CWelsTaskThread* pThread) {
299   CWelsAutoLock cLock (m_cLockIdleTasks);
300   m_cIdleThreads->push_back (pThread);
301   return WELS_THREAD_ERROR_OK;
302 }
303 
AddThreadToBusyList(CWelsTaskThread * pThread)304 WELS_THREAD_ERROR_CODE CWelsThreadPool::AddThreadToBusyList (CWelsTaskThread* pThread) {
305   CWelsAutoLock cLock (m_cLockBusyTasks);
306   m_cBusyThreads->push_back (pThread);
307   return WELS_THREAD_ERROR_OK;
308 }
309 
RemoveThreadFromBusyList(CWelsTaskThread * pThread)310 WELS_THREAD_ERROR_CODE CWelsThreadPool::RemoveThreadFromBusyList (CWelsTaskThread* pThread) {
311   CWelsAutoLock cLock (m_cLockBusyTasks);
312   if (m_cBusyThreads->erase (pThread)) {
313     return WELS_THREAD_ERROR_OK;
314   } else {
315     return WELS_THREAD_ERROR_GENERAL;
316   }
317 }
318 
AddTaskToWaitedList(IWelsTask * pTask)319 bool  CWelsThreadPool::AddTaskToWaitedList (IWelsTask* pTask) {
320   CWelsAutoLock  cLock (m_cLockWaitedTasks);
321 
322   return m_cWaitedTasks->push_back (pTask);
323 }
324 
GetIdleThread()325 CWelsTaskThread*   CWelsThreadPool::GetIdleThread() {
326   CWelsAutoLock cLock (m_cLockIdleTasks);
327 
328   if (NULL == m_cIdleThreads || m_cIdleThreads->size() == 0) {
329     return NULL;
330   }
331 
332   //fprintf(stdout, "CWelsThreadPool::GetIdleThread=%d\n", m_cIdleThreads->size());
333 
334   CWelsTaskThread* pThread = m_cIdleThreads->begin();
335   m_cIdleThreads->pop_front();
336   return pThread;
337 }
338 
GetBusyThreadNum()339 int32_t  CWelsThreadPool::GetBusyThreadNum() {
340   return (m_cBusyThreads?m_cBusyThreads->size():0);
341 }
342 
GetIdleThreadNum()343 int32_t  CWelsThreadPool::GetIdleThreadNum() {
344   return (m_cIdleThreads?m_cIdleThreads->size():0);
345 }
346 
GetWaitedTaskNum()347 int32_t  CWelsThreadPool::GetWaitedTaskNum() {
348   return (m_cWaitedTasks?m_cWaitedTasks->size():0);
349 }
350 
GetWaitedTask()351 IWelsTask* CWelsThreadPool::GetWaitedTask() {
352   CWelsAutoLock lock (m_cLockWaitedTasks);
353 
354   if (NULL==m_cWaitedTasks || m_cWaitedTasks->size() == 0) {
355     return NULL;
356   }
357 
358   IWelsTask* pTask = m_cWaitedTasks->begin();
359 
360   m_cWaitedTasks->pop_front();
361 
362   return pTask;
363 }
364 
ClearWaitedTasks()365 void  CWelsThreadPool::ClearWaitedTasks() {
366   CWelsAutoLock cLock (m_cLockWaitedTasks);
367   if (NULL == m_cWaitedTasks) {
368     return;
369   }
370   IWelsTask* pTask = NULL;
371   while (0 != m_cWaitedTasks->size()) {
372     pTask = m_cWaitedTasks->begin();
373     if (pTask->GetSink()) {
374       pTask->GetSink()->OnTaskCancelled();
375     }
376     m_cWaitedTasks->pop_front();
377   }
378 }
379 
380 }
381