1 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
2 // -*- Mode: C++ -*-
3 //
4 // Copyright (C) 2013-2022 Red Hat, Inc.
5 //
6 // Author: Dodji Seketeli
7
8 /// @file
9 ///
10 /// This file implements the worker threads (or thread pool) design
11 /// pattern. It aims at performing a set of tasks in parallel, using
12 /// the multi-threading capabilities of the underlying processor(s).
13
14 #include <assert.h>
15 #include <unistd.h>
16 #include <pthread.h>
17 #include <queue>
18 #include <vector>
19 #include <iostream>
20
21 #include "abg-fwd.h"
22 #include "abg-internal.h"
23 // <headers defining libabigail's API go under here>
24 ABG_BEGIN_EXPORT_DECLARATIONS
25
26 #include "abg-workers.h"
27
28 ABG_END_EXPORT_DECLARATIONS
29 // </headers defining libabigail's API>
30
31 namespace abigail
32 {
33
34 namespace workers
35 {
36
37 /// @defgroup thread_pool Worker Threads
38 /// @{
39 ///
40 /// \brief Libabigail's implementation of Thread Pools.
41 ///
42 /// The main interface of this pattern is a @ref queue of @ref tasks
43 /// to be performed. Associated to that queue are a set of worker
44 /// threads (these are native posix threads) that sits there, idle,
45 /// until at least one @ref task is added to the queue.
46 ///
47 /// When a @ref task is added to the @ref queue, one thread is woken
48 /// up, picks the @ref task, removes it from the @ref queue, and
49 /// executes the instructions it carries. We say the worker thread
50 /// performs the @ref task.
51 ///
52 /// When the worker thread is done performing the @ref task, the
53 /// performed @ref task is added to another queue, named as the "done
54 /// queue". Then the thread looks at the @ref queue of tasks to be
55 /// performed again, and if there is at least one task in that queue,
56 /// the same process as above is done. Otherwise, the thread blocks,
57 /// waiting for a new task to be added to the queue.
58 ///
59 /// By default, the number of worker threads is equal to the number of
60 /// execution threads advertised by the underlying processor.
61 ///
62 /// Note that the user of the queue can either wait for all the tasks
63 /// to be performed by the pool of threads,and them stop them, get the
64 /// vector of done tasks and proceed to whatever computation she may
65 /// need next.
66 ///
67 /// Or she can choose to be asynchronously notified whenever a task is
68 /// performed and added to the "done queue".
69 ///
70 ///@}
71
72 /// @return The number of hardware threads of executions advertised by
73 /// the underlying processor.
74 size_t
get_number_of_threads()75 get_number_of_threads()
76 {return sysconf(_SC_NPROCESSORS_ONLN);}
77
78 /// The abstraction of a worker thread.
79 ///
80 /// This is an implementation detail of the @ref queue public
81 /// interface type of this worker thread design pattern.
82 struct worker
83 {
84 pthread_t tid;
85
workerabigail::workers::worker86 worker()
87 : tid()
88 {}
89
90 static queue::priv*
91 wait_to_execute_a_task(queue::priv*);
92 }; // end struct worker
93
94 // </worker declarations>
95
96 // <queue stuff>
97
98 /// The private data structure of the task queue.
99 struct queue::priv
100 {
101 // A boolean to say if the user wants to shutdown the worker
102 // threads. guarded by tasks_todo_mutex.
103 // TODO: once we have std::atomic<bool>, use it and reconsider the
104 // synchronization around its reads and writes
105 bool bring_workers_down;
106 // The number of worker threads.
107 size_t num_workers;
108 // A mutex that protects the todo tasks queue from being accessed in
109 // read/write by two threads at the same time.
110 pthread_mutex_t tasks_todo_mutex;
111 // The queue condition variable. This condition is used to make the
112 // worker threads sleep until a new task is added to the queue of
113 // todo tasks. Whenever a new task is added to that queue, a signal
114 // is sent to all a thread sleeping on this condition variable.
115 pthread_cond_t tasks_todo_cond;
116 // A mutex that protects the done tasks queue from being accessed in
117 // read/write by two threads at the same time.
118 pthread_mutex_t tasks_done_mutex;
119 // A condition to be signalled whenever there is a task done. That is being
120 // used to wait for tasks completed when bringing the workers down.
121 pthread_cond_t tasks_done_cond;
122 // The todo task queue itself.
123 std::queue<task_sptr> tasks_todo;
124 // The done task queue itself.
125 std::vector<task_sptr> tasks_done;
126 // This functor is invoked to notify the user of this queue that a
127 // task has been completed and has been added to the done tasks
128 // vector. We call it a notifier. This notifier is the default
129 // notifier of the work queue; the one that is used when the user
130 // has specified no notifier. It basically does nothing.
131 static task_done_notify default_notify;
132 // This is a reference to the the notifier that is actually used in
133 // the queue. It's either the one specified by the user or the
134 // default one.
135 task_done_notify& notify;
136 // A vector of the worker threads.
137 std::vector<worker> workers;
138
139 /// A constructor of @ref queue::priv.
140 ///
141 /// @param nb_workers the number of worker threads to have in the
142 /// thread pool.
143 ///
144 /// @param task_done_notify a functor object that is invoked by the
145 /// worker thread which has performed the task, right after it's
146 /// added that task to the vector of the done tasks.
privabigail::workers::queue::priv147 priv(size_t nb_workers = get_number_of_threads(),
148 task_done_notify& n = default_notify)
149 : bring_workers_down(),
150 num_workers(nb_workers),
151 tasks_todo_mutex(),
152 tasks_todo_cond(),
153 tasks_done_mutex(),
154 tasks_done_cond(),
155 notify(n)
156 {create_workers();}
157
158 /// Create the worker threads pool and have all threads sit idle,
159 /// waiting for a task to be added to the todo queue.
160 void
create_workersabigail::workers::queue::priv161 create_workers()
162 {
163 for (unsigned i = 0; i < num_workers; ++i)
164 {
165 worker w;
166 ABG_ASSERT(pthread_create(&w.tid,
167 /*attr=*/0,
168 (void*(*)(void*))&worker::wait_to_execute_a_task,
169 this) == 0);
170 workers.push_back(w);
171 }
172 }
173
174 /// Submit a task to the queue of tasks to be performed.
175 ///
176 /// This wakes up one thread from the pool which immediatly starts
177 /// performing the task. When it's done with the task, it goes back
178 /// to be suspended, waiting for a new task to be scheduled.
179 ///
180 /// @param t the task to schedule. Note that a nil task won't be
181 /// scheduled. If the queue is empty, the task @p t won't be
182 /// scheduled either.
183 ///
184 /// @return true iff the task @p t was successfully scheduled.
185 bool
schedule_taskabigail::workers::queue::priv186 schedule_task(const task_sptr& t)
187 {
188 if (workers.empty() || !t)
189 return false;
190
191 pthread_mutex_lock(&tasks_todo_mutex);
192 tasks_todo.push(t);
193 pthread_mutex_unlock(&tasks_todo_mutex);
194 pthread_cond_signal(&tasks_todo_cond);
195 return true;
196 }
197
198 /// Submit a vector of task to the queue of tasks to be performed.
199 ///
200 /// This wakes up threads of the pool which immediatly start
201 /// performing the tasks. When they are done with the task, they go
202 /// back to be suspended, waiting for new tasks to be scheduled.
203 ///
204 /// @param tasks the tasks to schedule.
205 bool
schedule_tasksabigail::workers::queue::priv206 schedule_tasks(const tasks_type& tasks)
207 {
208 bool is_ok= true;
209 for (tasks_type::const_iterator t = tasks.begin(); t != tasks.end(); ++t)
210 is_ok &= schedule_task(*t);
211 return is_ok;
212 }
213
214 /// Signal all the threads (of the pool) which are suspended and
215 /// waiting to perform a task, so that they wake up and end up their
216 /// execution. If there is no task to perform, they just end their
217 /// execution. If there are tasks to perform, they finish them and
218 /// then end their execution.
219 ///
220 /// This function then joins all the tasks of the pool, waiting for
221 /// them to finish, and then it returns. In other words, this
222 /// function suspends the thread of the caller, waiting for the
223 /// worker threads to finish their tasks, and end their execution.
224 ///
225 /// If the user code wants to work with the thread pool again,
226 /// she'll need to create them again, using the member function
227 /// create_workers().
228 void
do_bring_workers_downabigail::workers::queue::priv229 do_bring_workers_down()
230 {
231 if (workers.empty())
232 return;
233
234 // Wait for the todo list to be empty to make sure all tasks got picked up
235 pthread_mutex_lock(&tasks_todo_mutex);
236 while (!tasks_todo.empty())
237 pthread_cond_wait(&tasks_done_cond, &tasks_todo_mutex);
238
239 bring_workers_down = true;
240 pthread_mutex_unlock(&tasks_todo_mutex);
241
242 // Now that the task queue is empty, drain the workers by waking them up,
243 // letting them finish their final task before termination.
244 ABG_ASSERT(pthread_cond_broadcast(&tasks_todo_cond) == 0);
245
246 for (std::vector<worker>::const_iterator i = workers.begin();
247 i != workers.end();
248 ++i)
249 ABG_ASSERT(pthread_join(i->tid, /*thread_return=*/0) == 0);
250 workers.clear();
251 }
252
253 /// Destructors of @ref queue::priv type.
~privabigail::workers::queue::priv254 ~priv()
255 {do_bring_workers_down();}
256
257 }; //end struct queue::priv
258
259 // default initialize the default notifier.
260 queue::task_done_notify queue::priv::default_notify;
261
262 /// Default constructor of the @ref queue type.
263 ///
264 /// By default the queue is created with a number of worker threaders
265 /// which is equals to the number of simultaneous execution threads
266 /// supported by the underlying processor.
queue()267 queue::queue()
268 : p_(new priv())
269 {}
270
271 /// Constructor of the @ref queue type.
272 ///
273 /// @param number_of_workers the number of worker threads to have in
274 /// the pool.
queue(unsigned number_of_workers)275 queue::queue(unsigned number_of_workers)
276 : p_(new priv(number_of_workers))
277 {}
278
279 /// Constructor of the @ref queue type.
280 ///
281 /// @param number_of_workers the number of worker threads to have in
282 /// the pool.
283 ///
284 /// @param the notifier to invoke when a task is done doing its job.
285 /// Users should create a type that inherit this @ref task_done_notify
286 /// class and overload its virtual task_done_notify::operator()
287 /// operator function. Note that the code of that
288 /// task_done_notify::operator() is assured to run in *sequence*, with
289 /// respect to the code of other task_done_notify::operator() from
290 /// other tasks.
queue(unsigned number_of_workers,task_done_notify & notifier)291 queue::queue(unsigned number_of_workers,
292 task_done_notify& notifier)
293 : p_(new priv(number_of_workers, notifier))
294 {}
295
296 /// Getter of the size of the queue. This gives the number of task
297 /// still present in the queue.
298 ///
299 /// @return the number of task still present in the queue.
300 size_t
get_size() const301 queue::get_size() const
302 {return p_->tasks_todo.size();}
303
304 /// Submit a task to the queue of tasks to be performed.
305 ///
306 /// This wakes up one thread from the pool which immediatly starts
307 /// performing the task. When it's done with the task, it goes back
308 /// to be suspended, waiting for a new task to be scheduled.
309 ///
310 /// @param t the task to schedule. Note that if the queue is empty or
311 /// if the task is nil, the task is not scheduled.
312 ///
313 /// @return true iff the task was successfully scheduled.
314 bool
schedule_task(const task_sptr & t)315 queue::schedule_task(const task_sptr& t)
316 {return p_->schedule_task(t);}
317
318 /// Submit a vector of tasks to the queue of tasks to be performed.
319 ///
320 /// This wakes up one or more threads from the pool which immediatly
321 /// start performing the tasks. When the threads are done with the
322 /// tasks, they goes back to be suspended, waiting for a new task to
323 /// be scheduled.
324 ///
325 /// @param tasks the tasks to schedule.
326 bool
schedule_tasks(const tasks_type & tasks)327 queue::schedule_tasks(const tasks_type& tasks)
328 {return p_->schedule_tasks(tasks);}
329
330 /// Suspends the current thread until all worker threads finish
331 /// performing the tasks they are executing.
332 ///
333 /// If the worker threads were suspended waiting for a new task to
334 /// perform, they are woken up and their execution ends.
335 ///
336 /// The execution of the current thread is resumed when all the
337 /// threads of the pool have finished their execution and are
338 /// terminated.
339 void
wait_for_workers_to_complete()340 queue::wait_for_workers_to_complete()
341 {p_->do_bring_workers_down();}
342
343 /// Getter of the vector of tasks that got performed.
344 ///
345 /// @return the vector of tasks that got performed.
346 std::vector<task_sptr>&
get_completed_tasks() const347 queue::get_completed_tasks() const
348 {return p_->tasks_done;}
349
350 /// Destructor for the @ref queue type.
~queue()351 queue::~queue()
352 {}
353
354 /// The default function invocation operator of the @ref queue type.
355 ///
356 /// This does nothing.
357 void
operator ()(const task_sptr &)358 queue::task_done_notify::operator()(const task_sptr&/*task_done*/)
359 {
360 }
361
362 // </queue stuff>
363
364 // <worker definitions>
365
366 /// Wait to be woken up by a thread condition signal, then look if
367 /// there is a task to be executed. If there is, then pick one (in a
368 /// FIFO manner), execute it, and put the executed task into the set
369 /// of done tasks.
370 ///
371 /// @param t the private data of the "task queue" type to consider.
372 ///
373 /// @param return the same private data of the task queue type we got
374 /// in argument.
375 queue::priv*
wait_to_execute_a_task(queue::priv * p)376 worker::wait_to_execute_a_task(queue::priv* p)
377 {
378 while (true)
379 {
380 pthread_mutex_lock(&p->tasks_todo_mutex);
381 // If there is no more tasks to perform and the queue is not to
382 // be brought down then wait (sleep) for new tasks to come up.
383 while (p->tasks_todo.empty() && !p->bring_workers_down)
384 pthread_cond_wait(&p->tasks_todo_cond, &p->tasks_todo_mutex);
385
386 // We were woken up. So maybe there are tasks to perform? If
387 // so, get a task from the queue ...
388 task_sptr t;
389 if (!p->tasks_todo.empty())
390 {
391 t = p->tasks_todo.front();
392 p->tasks_todo.pop();
393 }
394 pthread_mutex_unlock(&p->tasks_todo_mutex);
395
396 // If we've got a task to perform then perform it and when it's
397 // done then add to the set of tasks that are done.
398 if (t)
399 {
400 t->perform();
401
402 // Add the task to the vector of tasks that are done and
403 // notify listeners about the fact that the task is done.
404 //
405 // Note that this (including the notification) is not
406 // happening in parallel. So the code performed by the
407 // notifier during the notification is running sequentially,
408 // not in parallel with any other task that was just done
409 // and that is notifying its listeners.
410 pthread_mutex_lock(&p->tasks_done_mutex);
411 p->tasks_done.push_back(t);
412 p->notify(t);
413 pthread_mutex_unlock(&p->tasks_done_mutex);
414 pthread_cond_signal(&p->tasks_done_cond);
415 }
416
417 // ensure we access bring_workers_down always guarded
418 bool drop_out = false;
419 pthread_mutex_lock(&p->tasks_todo_mutex);
420 drop_out = p->bring_workers_down;
421 pthread_mutex_unlock(&p->tasks_todo_mutex);
422 if (drop_out)
423 break;
424 }
425
426 return p;
427 }
428 // </worker definitions>
429 } //end namespace workers
430 } //end namespace abigail
431