1 /*
2 * thread_pool.cpp - Thread Pool
3 *
4 * Copyright (c) 2017 Intel Corporation
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * Author: Wind Yuan <feng.yuan@intel.com>
19 */
20
21 #include "thread_pool.h"
22
23 #define XCAM_POOL_MIN_THREADS 2
24 #define XCAM_POOL_MAX_THREADS 1024
25
26 namespace XCam {
27
28 class UserThread
29 : public Thread
30 {
31 public:
UserThread(const SmartPtr<ThreadPool> & pool,const char * name)32 UserThread (const SmartPtr<ThreadPool> &pool, const char *name)
33 : Thread (name)
34 , _pool (pool)
35 {}
36
37 protected:
38 virtual bool started ();
39 virtual void stopped ();
40 virtual bool loop ();
41
42 private:
43 SmartPtr<ThreadPool> _pool;
44 };
45
46 bool
started()47 UserThread::started ()
48 {
49 XCAM_ASSERT (_pool.ptr ());
50 SmartLock lock (_pool->_mutex);
51 return true;
52 }
53
54 void
stopped()55 UserThread::stopped ()
56 {
57 XCAM_LOG_DEBUG ("thread(%s, %p) stopped", XCAM_STR(get_name ()), this);
58 }
59
60 bool
loop()61 UserThread::loop ()
62 {
63 XCAM_ASSERT (_pool.ptr ());
64 {
65 SmartLock lock (_pool->_mutex);
66 if (!_pool->_running)
67 return false;
68 }
69
70 SmartPtr<ThreadPool::UserData> data = _pool->_data_queue.pop ();
71 if (!data.ptr ()) {
72 XCAM_LOG_DEBUG ("user thread(%s) get null data, need stop", XCAM_STR (_pool->get_name ()));
73 return false;
74 }
75
76 {
77 SmartLock lock (_pool->_mutex);
78 XCAM_ASSERT (_pool->_free_threads > 0);
79 --_pool->_free_threads;
80 }
81
82 bool ret = _pool->dispatch (data);
83
84 if (ret) {
85 SmartLock lock (_pool->_mutex);
86 ++_pool->_free_threads;
87 }
88 return ret;
89 }
90
91 bool
dispatch(const SmartPtr<ThreadPool::UserData> & data)92 ThreadPool::dispatch (const SmartPtr<ThreadPool::UserData> &data)
93 {
94 XCAM_FAIL_RETURN (
95 ERROR, data.ptr(), true,
96 "ThreadPool(%s) dispatch NULL data", XCAM_STR (get_name ()));
97 XCamReturn err = data->run ();
98 data->done (err);
99 return true;
100 }
101
ThreadPool(const char * name)102 ThreadPool::ThreadPool (const char *name)
103 : _name (NULL)
104 , _min_threads (XCAM_POOL_MIN_THREADS)
105 , _max_threads (XCAM_POOL_MIN_THREADS)
106 , _allocated_threads (0)
107 , _free_threads (0)
108 , _running (false)
109 {
110 if (name)
111 _name = strndup (name, XCAM_MAX_STR_SIZE);
112 }
113
~ThreadPool()114 ThreadPool::~ThreadPool ()
115 {
116 stop ();
117
118 xcam_mem_clear (_name);
119 }
120
121 bool
set_threads(uint32_t min,uint32_t max)122 ThreadPool::set_threads (uint32_t min, uint32_t max)
123 {
124 XCAM_FAIL_RETURN (
125 ERROR, !_running, false,
126 "ThreadPool(%s) set threads failed, need stop the pool first", XCAM_STR(get_name ()));
127
128 if (min < XCAM_POOL_MIN_THREADS)
129 min = XCAM_POOL_MIN_THREADS;
130 if (max > XCAM_POOL_MAX_THREADS)
131 max = XCAM_POOL_MAX_THREADS;
132
133 if (min > max)
134 min = max;
135
136 _min_threads = min;
137 _max_threads = max;
138 return true;
139 }
140
141 bool
is_running()142 ThreadPool::is_running ()
143 {
144 SmartLock locker(_mutex);
145 return _running;
146 }
147
148 XCamReturn
start()149 ThreadPool::start ()
150 {
151 SmartLock locker(_mutex);
152 if (_running)
153 return XCAM_RETURN_NO_ERROR;
154
155 _free_threads = 0;
156 _allocated_threads = 0;
157 _data_queue.resume_pop ();
158
159 for (uint32_t i = 0; i < _min_threads; ++i) {
160 XCamReturn ret = create_user_thread_unsafe ();
161 XCAM_FAIL_RETURN (
162 ERROR, xcam_ret_is_ok (ret), ret,
163 "thread pool(%s) start failed by creating user thread", XCAM_STR (get_name()));
164 }
165
166 XCAM_ASSERT (_allocated_threads == _min_threads);
167
168 _running = true;
169 return XCAM_RETURN_NO_ERROR;
170 }
171
172 XCamReturn
stop()173 ThreadPool::stop ()
174 {
175 UserThreadList threads;
176 {
177 SmartLock locker(_mutex);
178 if (!_running)
179 return XCAM_RETURN_NO_ERROR;
180
181 _running = false;
182 threads = _thread_list;
183 _thread_list.clear ();
184 }
185
186 for (UserThreadList::iterator i = threads.begin (); i != threads.end (); ++i)
187 {
188 SmartPtr<UserThread> t = *i;
189 XCAM_ASSERT (t.ptr ());
190 t->emit_stop ();
191 }
192
193 _data_queue.pause_pop ();
194 _data_queue.clear ();
195
196 for (UserThreadList::iterator i = threads.begin (); i != threads.end (); ++i)
197 {
198 SmartPtr<UserThread> t = *i;
199 XCAM_ASSERT (t.ptr ());
200 t->stop ();
201 }
202
203 {
204 SmartLock locker(_mutex);
205 _free_threads = 0;
206 _allocated_threads = 0;
207 }
208
209 return XCAM_RETURN_NO_ERROR;
210 }
211
212 XCamReturn
create_user_thread_unsafe()213 ThreadPool::create_user_thread_unsafe ()
214 {
215 char name[256];
216 snprintf (name, 255, "%s-%d", XCAM_STR (get_name()), _allocated_threads);
217 SmartPtr<UserThread> thread = new UserThread (this, name);
218 XCAM_ASSERT (thread.ptr ());
219 XCAM_FAIL_RETURN (
220 ERROR, thread.ptr () && thread->start (), XCAM_RETURN_ERROR_THREAD,
221 "ThreadPool(%s) create user thread failed by starting error", XCAM_STR (get_name()));
222
223 _thread_list.push_back (thread);
224
225 ++_allocated_threads;
226 ++_free_threads;
227 XCAM_ASSERT (_free_threads <= _allocated_threads);
228
229 return XCAM_RETURN_NO_ERROR;
230 }
231
232 XCamReturn
queue(const SmartPtr<UserData> & data)233 ThreadPool::queue (const SmartPtr<UserData> &data)
234 {
235 XCAM_ASSERT (data.ptr ());
236 {
237 SmartLock locker (_mutex);
238 if (!_running)
239 return XCAM_RETURN_ERROR_THREAD;
240 }
241
242 if (!_data_queue.push (data))
243 return XCAM_RETURN_ERROR_THREAD;
244
245 do {
246 SmartLock locker(_mutex);
247 if (!_running) {
248 _data_queue.erase (data);
249 return XCAM_RETURN_ERROR_THREAD;
250 }
251
252 if (_allocated_threads >= _max_threads)
253 break;
254
255 if (!_free_threads)
256 break;
257
258 XCamReturn err = create_user_thread_unsafe ();
259 if (!xcam_ret_is_ok (err) && _allocated_threads) {
260 XCAM_LOG_WARNING ("thread pool(%s) create new thread failed but queue data can continue");
261 break;
262 }
263
264 XCAM_FAIL_RETURN (
265 ERROR, xcam_ret_is_ok (err), err,
266 "thread pool(%s) queue data failed by creating user thread", XCAM_STR (get_name()));
267
268 } while (0);
269
270 return XCAM_RETURN_NO_ERROR;
271 }
272
273 }
274