abg-workers: Rework the worker queue to improve concurrent behaviour
Commit Message
This patch refactors the abigail::workers::queue and
abigail::workers::worker implementations to avoid holding locking
primitives longer than necessary.
In particular, the queue_cond_mutex was held during the entiry worker
runtime, effectively serializing the workers. Hence, use a mutex+cond
pair for each, the input and output queue and only synchronize around
the interaction with their corresponding queues. The
tasks_todo_(mutex|cond) are meant to synchronize scheduling and
distribution of work among workers, while tasks_done_(mutex|cond) are
used for synchronizing threads when putting back the tasks to the output
queue and to hold back threads waiting for the queue and workers to
drain.
Along that way, I did some cleanup that was now possible.
- Move entire implementation of abigail::workers::task into header.
- Make default_notify a static member.
- Replace the multiple constructors with one with default arguments.
* include/abg-workers.h (workers::task): move entire
implementation to header and drop superfluous forward declaration.
* src/abg-workers.cc (workers::task):: Likewise.
(workers::queue::priv): Drop queue_cond_mutex, rename queue_cond
to tasks_todo_cond, add task_done_cond, make default_notify
static.
(workers::queue::priv::priv): Add default arguments to fully
qualified constructor, drop the remaining ones.
(workers::queue:prive::more_tasks_to_execute): Drop method.
(workers::queue:prive::schedule_task): Do not synchronize access
to the queue condition variable, but only on the mutex.
(do_bring_workers_down): Likewise. Also await tasks_done to be
empty.
(workers::queue:prive::worker::wait_to_execute_a_task): Await
tasks on the tasks_todo with tasks_todo_(cond|mutex) and signal
task completion to tasks_done_cond.
Signed-off-by: Matthias Maennich <maennich@google.com>
---
include/abg-workers.h | 8 +--
src/abg-workers.cc | 124 ++++++++++++------------------------------
2 files changed, 39 insertions(+), 93 deletions(-)
@@ -45,9 +45,6 @@ namespace abigail
namespace workers
{
-class task;
-typedef shared_ptr<task> task_sptr;
-
size_t get_number_of_threads();
/// This represents a task to be performed.
@@ -60,13 +57,14 @@ size_t get_number_of_threads();
class task
{
public:
- task();
virtual void
perform() = 0;
- virtual ~task();
+ virtual ~task(){};
}; // end class task.
+typedef shared_ptr<task> task_sptr;
+
/// This represents a queue of tasks to be performed.
///
/// Tasks are performed by a number of worker threads.
@@ -90,23 +90,6 @@ size_t
get_number_of_threads()
{return sysconf(_SC_NPROCESSORS_ONLN);}
-// <task stuff>
-
-/// The default constructor of the @ref task type
-task::task()
-{}
-
-/// Destructor of the @ref task type.
-task::~task()
-{}
-// </task stuff>
-
-// <worker declarations>
-struct worker;
-
-/// A convenience typedef for a shared_ptr to the @ref worker type.
-typedef shared_ptr<worker> worker_sptr;
-
/// The abstraction of a worker thread.
///
/// This is an implementation detail of the @ref queue public
@@ -135,63 +118,39 @@ struct queue::priv
bool bring_workers_down;
// The number of worker threads.
size_t num_workers;
- // The mutex associated to the queue condition variable below.
- pthread_mutex_t queue_cond_mutex;
+ // A mutex that protects the todo tasks queue from being accessed in
+ // read/write by two threads at the same time.
+ pthread_mutex_t tasks_todo_mutex;
// The queue condition variable. This condition is used to make the
// worker threads sleep until a new task is added to the queue of
// todo tasks. Whenever a new task is added to that queue, a signal
- // is sent to all the threads sleeping on this condition variable,
- // and only one of them wakes up and takes the mutex
- // queue_cond_mutex above.
- pthread_cond_t queue_cond;
- // A mutex that protects the todo tasks queue from being accessed in
- // read/write by two threads at the same time.
- mutable pthread_mutex_t tasks_todo_mutex;
+ // is sent to all a thread sleeping on this condition variable.
+ pthread_cond_t tasks_todo_cond;
// A mutex that protects the done tasks queue from being accessed in
// read/write by two threads at the same time.
pthread_mutex_t tasks_done_mutex;
+ // A condition to be signalled whenever there is a task done. That is being
+ // used to wait for tasks completed when bringing the workers down.
+ pthread_cond_t tasks_done_cond;
+
// The todo task queue itself.
- std::queue<task_sptr> tasks_todo;
+ std::queue<task_sptr> tasks_todo;
// The done task queue itself.
std::vector<task_sptr> tasks_done;
+
// This functor is invoked to notify the user of this queue that a
// task has been completed and has been added to the done tasks
// vector. We call it a notifier. This notifier is the default
// notifier of the work queue; the one that is used when the user
// has specified no notifier. It basically does nothing.
- task_done_notify default_notify;
+ static task_done_notify default_notify;
// This is a reference to the the notifier that is actually used in
// the queue. It's either the one specified by the user or the
// default one.
- task_done_notify& notify;
+ task_done_notify& notify;
// A vector of the worker threads.
std::vector<worker> workers;
- /// The default constructor of @ref queue::priv.
- priv()
- : bring_workers_down(),
- num_workers(get_number_of_threads()),
- queue_cond_mutex(),
- queue_cond(),
- tasks_todo_mutex(),
- tasks_done_mutex(),
- notify(default_notify)
- {create_workers();}
-
- /// A constructor of @ref queue::priv.
- ///
- /// @param nb_workers the number of worker threads to have in the
- /// thread pool.
- priv(size_t nb_workers)
- : bring_workers_down(),
- num_workers(nb_workers),
- queue_cond_mutex(),
- queue_cond(),
- tasks_todo_mutex(),
- tasks_done_mutex(),
- notify(default_notify)
- {create_workers();}
-
/// A constructor of @ref queue::priv.
///
/// @param nb_workers the number of worker threads to have in the
@@ -200,28 +159,17 @@ struct queue::priv
/// @param task_done_notify a functor object that is invoked by the
/// worker thread which has performed the task, right after it's
/// added that task to the vector of the done tasks.
- priv(size_t nb_workers, task_done_notify& n)
+ priv(size_t nb_workers = get_number_of_threads(),
+ task_done_notify& n = default_notify)
: bring_workers_down(),
num_workers(nb_workers),
- queue_cond_mutex(),
- queue_cond(),
tasks_todo_mutex(),
+ tasks_todo_cond(),
tasks_done_mutex(),
+ tasks_done_cond(),
notify(n)
{create_workers();}
- /// Tests if there are more tasks to execute from the task queue.
- ///
- /// @return true iff there are more tasks to execute.
- bool
- more_tasks_to_execute() const
- {
- pthread_mutex_lock(&tasks_todo_mutex);
- bool result = !tasks_todo.empty();
- pthread_mutex_unlock(&tasks_todo_mutex);
- return result;
- }
-
/// Create the worker threads pool and have all threads sit idle,
/// waiting for a task to be added to the todo queue.
void
@@ -258,10 +206,7 @@ struct queue::priv
pthread_mutex_lock(&tasks_todo_mutex);
tasks_todo.push(t);
pthread_mutex_unlock(&tasks_todo_mutex);
-
- pthread_mutex_lock(&queue_cond_mutex);
- pthread_cond_signal(&queue_cond);
- pthread_mutex_unlock(&queue_cond_mutex);
+ pthread_cond_signal(&tasks_todo_cond);
return true;
}
@@ -301,13 +246,17 @@ struct queue::priv
if (workers.empty())
return;
- // Acquire the mutex that protects the queue condition variable
- // (queue_cond) and wake up all the workers that are sleeping on
- // the condition.
- pthread_mutex_lock(&queue_cond_mutex);
+ // Wait for the todo list to be empty to make sure all tasks got picked up
+ pthread_mutex_lock(&tasks_todo_mutex);
+ while (!tasks_todo.empty()) {
+ pthread_cond_wait(&tasks_done_cond, &tasks_todo_mutex);
+ }
+ pthread_mutex_unlock(&tasks_todo_mutex);
+
+ // Now that the task queue is empty, drain the workers by waking them up,
+ // letting them finish their final task before termination.
bring_workers_down = true;
- ABG_ASSERT(pthread_cond_broadcast(&queue_cond) == 0);
- pthread_mutex_unlock(&queue_cond_mutex);
+ ABG_ASSERT(pthread_cond_broadcast(&tasks_todo_cond) == 0);
for (std::vector<worker>::const_iterator i = workers.begin();
i != workers.end();
@@ -322,6 +271,9 @@ struct queue::priv
}; //end struct queue::priv
+// default initialize the default notifier.
+queue::task_done_notify queue::priv::default_notify;
+
/// Default constructor of the @ref queue type.
///
/// By default the queue is created with a number of worker threaders
@@ -438,20 +390,17 @@ queue::task_done_notify::operator()(const task_sptr&/*task_done*/)
queue::priv*
worker::wait_to_execute_a_task(queue::priv* p)
{
-
- pthread_mutex_lock(&p->queue_cond_mutex);
-
do
{
+ pthread_mutex_lock(&p->tasks_todo_mutex);
// If there is no more tasks to perform and the queue is not to
// be brought down then wait (sleep) for new tasks to come up.
- while (!p->more_tasks_to_execute() && !p->bring_workers_down)
- pthread_cond_wait(&p->queue_cond, &p->queue_cond_mutex);
+ while (p->tasks_todo.empty() && !p->bring_workers_down)
+ pthread_cond_wait(&p->tasks_todo_cond, &p->tasks_todo_mutex);
// We were woken up. So maybe there are tasks to perform? If
// so, get a task from the queue ...
task_sptr t;
- pthread_mutex_lock(&p->tasks_todo_mutex);
if (!p->tasks_todo.empty())
{
t = p->tasks_todo.front();
@@ -477,11 +426,10 @@ worker::wait_to_execute_a_task(queue::priv* p)
p->tasks_done.push_back(t);
p->notify(t);
pthread_mutex_unlock(&p->tasks_done_mutex);
+ pthread_cond_signal(&p->tasks_done_cond);
}
}
- while (!p->bring_workers_down || p->more_tasks_to_execute());
-
- pthread_mutex_unlock(&p->queue_cond_mutex);
+ while (!p->bring_workers_down);
return p;
}