From patchwork Wed Jan 1 00:00:00 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Aleksei Vetrov via Libabigail X-Patchwork-Id: 39016 X-Patchwork-Original-From: libabigail@sourceware.org (Matthias Maennich via libabigail) From: Aleksei Vetrov via Libabigail Date: Wed, 01 Jan 2020 00:00:00 -0000 Subject: [PATCH] abg-workers: Rework the worker queue to improve concurrent behaviour Message-ID: <20200204130549.215023-1-maennich@google.com> This patch refactors the abigail::workers::queue and abigail::workers::worker implementations to avoid holding locking primitives longer than necessary. In particular, the queue_cond_mutex was held during the entiry worker runtime, effectively serializing the workers. Hence, use a mutex+cond pair for each, the input and output queue and only synchronize around the interaction with their corresponding queues. The tasks_todo_(mutex|cond) are meant to synchronize scheduling and distribution of work among workers, while tasks_done_(mutex|cond) are used for synchronizing threads when putting back the tasks to the output queue and to hold back threads waiting for the queue and workers to drain. Along that way, I did some cleanup that was now possible. - Move entire implementation of abigail::workers::task into header. - Make default_notify a static member. - Replace the multiple constructors with one with default arguments. * include/abg-workers.h (workers::task): move entire implementation to header and drop superfluous forward declaration. * src/abg-workers.cc (workers::task):: Likewise. (workers::queue::priv): Drop queue_cond_mutex, rename queue_cond to tasks_todo_cond, add task_done_cond, make default_notify static. (workers::queue::priv::priv): Add default arguments to fully qualified constructor, drop the remaining ones. (workers::queue:prive::more_tasks_to_execute): Drop method. (workers::queue:prive::schedule_task): Do not synchronize access to the queue condition variable, but only on the mutex. (do_bring_workers_down): Likewise. Also await tasks_done to be empty. (workers::queue:prive::worker::wait_to_execute_a_task): Await tasks on the tasks_todo with tasks_todo_(cond|mutex) and signal task completion to tasks_done_cond. Signed-off-by: Matthias Maennich --- include/abg-workers.h | 8 +-- src/abg-workers.cc | 124 ++++++++++++------------------------------ 2 files changed, 39 insertions(+), 93 deletions(-) diff --git a/include/abg-workers.h b/include/abg-workers.h index 2892837b631e..ccd547a84270 100644 --- a/include/abg-workers.h +++ b/include/abg-workers.h @@ -45,9 +45,6 @@ namespace abigail namespace workers { -class task; -typedef shared_ptr task_sptr; - size_t get_number_of_threads(); /// This represents a task to be performed. @@ -60,13 +57,14 @@ size_t get_number_of_threads(); class task { public: - task(); virtual void perform() = 0; - virtual ~task(); + virtual ~task(){}; }; // end class task. +typedef shared_ptr task_sptr; + /// This represents a queue of tasks to be performed. /// /// Tasks are performed by a number of worker threads. diff --git a/src/abg-workers.cc b/src/abg-workers.cc index 4771a2050f0d..8af97bafd355 100644 --- a/src/abg-workers.cc +++ b/src/abg-workers.cc @@ -90,23 +90,6 @@ size_t get_number_of_threads() {return sysconf(_SC_NPROCESSORS_ONLN);} -// - -/// The default constructor of the @ref task type -task::task() -{} - -/// Destructor of the @ref task type. -task::~task() -{} -// - -// -struct worker; - -/// A convenience typedef for a shared_ptr to the @ref worker type. -typedef shared_ptr worker_sptr; - /// The abstraction of a worker thread. /// /// This is an implementation detail of the @ref queue public @@ -135,63 +118,39 @@ struct queue::priv bool bring_workers_down; // The number of worker threads. size_t num_workers; - // The mutex associated to the queue condition variable below. - pthread_mutex_t queue_cond_mutex; + // A mutex that protects the todo tasks queue from being accessed in + // read/write by two threads at the same time. + pthread_mutex_t tasks_todo_mutex; // The queue condition variable. This condition is used to make the // worker threads sleep until a new task is added to the queue of // todo tasks. Whenever a new task is added to that queue, a signal - // is sent to all the threads sleeping on this condition variable, - // and only one of them wakes up and takes the mutex - // queue_cond_mutex above. - pthread_cond_t queue_cond; - // A mutex that protects the todo tasks queue from being accessed in - // read/write by two threads at the same time. - mutable pthread_mutex_t tasks_todo_mutex; + // is sent to all a thread sleeping on this condition variable. + pthread_cond_t tasks_todo_cond; // A mutex that protects the done tasks queue from being accessed in // read/write by two threads at the same time. pthread_mutex_t tasks_done_mutex; + // A condition to be signalled whenever there is a task done. That is being + // used to wait for tasks completed when bringing the workers down. + pthread_cond_t tasks_done_cond; + // The todo task queue itself. - std::queue tasks_todo; + std::queue tasks_todo; // The done task queue itself. std::vector tasks_done; + // This functor is invoked to notify the user of this queue that a // task has been completed and has been added to the done tasks // vector. We call it a notifier. This notifier is the default // notifier of the work queue; the one that is used when the user // has specified no notifier. It basically does nothing. - task_done_notify default_notify; + static task_done_notify default_notify; // This is a reference to the the notifier that is actually used in // the queue. It's either the one specified by the user or the // default one. - task_done_notify& notify; + task_done_notify& notify; // A vector of the worker threads. std::vector workers; - /// The default constructor of @ref queue::priv. - priv() - : bring_workers_down(), - num_workers(get_number_of_threads()), - queue_cond_mutex(), - queue_cond(), - tasks_todo_mutex(), - tasks_done_mutex(), - notify(default_notify) - {create_workers();} - - /// A constructor of @ref queue::priv. - /// - /// @param nb_workers the number of worker threads to have in the - /// thread pool. - priv(size_t nb_workers) - : bring_workers_down(), - num_workers(nb_workers), - queue_cond_mutex(), - queue_cond(), - tasks_todo_mutex(), - tasks_done_mutex(), - notify(default_notify) - {create_workers();} - /// A constructor of @ref queue::priv. /// /// @param nb_workers the number of worker threads to have in the @@ -200,28 +159,17 @@ struct queue::priv /// @param task_done_notify a functor object that is invoked by the /// worker thread which has performed the task, right after it's /// added that task to the vector of the done tasks. - priv(size_t nb_workers, task_done_notify& n) + priv(size_t nb_workers = get_number_of_threads(), + task_done_notify& n = default_notify) : bring_workers_down(), num_workers(nb_workers), - queue_cond_mutex(), - queue_cond(), tasks_todo_mutex(), + tasks_todo_cond(), tasks_done_mutex(), + tasks_done_cond(), notify(n) {create_workers();} - /// Tests if there are more tasks to execute from the task queue. - /// - /// @return true iff there are more tasks to execute. - bool - more_tasks_to_execute() const - { - pthread_mutex_lock(&tasks_todo_mutex); - bool result = !tasks_todo.empty(); - pthread_mutex_unlock(&tasks_todo_mutex); - return result; - } - /// Create the worker threads pool and have all threads sit idle, /// waiting for a task to be added to the todo queue. void @@ -258,10 +206,7 @@ struct queue::priv pthread_mutex_lock(&tasks_todo_mutex); tasks_todo.push(t); pthread_mutex_unlock(&tasks_todo_mutex); - - pthread_mutex_lock(&queue_cond_mutex); - pthread_cond_signal(&queue_cond); - pthread_mutex_unlock(&queue_cond_mutex); + pthread_cond_signal(&tasks_todo_cond); return true; } @@ -301,13 +246,17 @@ struct queue::priv if (workers.empty()) return; - // Acquire the mutex that protects the queue condition variable - // (queue_cond) and wake up all the workers that are sleeping on - // the condition. - pthread_mutex_lock(&queue_cond_mutex); + // Wait for the todo list to be empty to make sure all tasks got picked up + pthread_mutex_lock(&tasks_todo_mutex); + while (!tasks_todo.empty()) { + pthread_cond_wait(&tasks_done_cond, &tasks_todo_mutex); + } + pthread_mutex_unlock(&tasks_todo_mutex); + + // Now that the task queue is empty, drain the workers by waking them up, + // letting them finish their final task before termination. bring_workers_down = true; - ABG_ASSERT(pthread_cond_broadcast(&queue_cond) == 0); - pthread_mutex_unlock(&queue_cond_mutex); + ABG_ASSERT(pthread_cond_broadcast(&tasks_todo_cond) == 0); for (std::vector::const_iterator i = workers.begin(); i != workers.end(); @@ -322,6 +271,9 @@ struct queue::priv }; //end struct queue::priv +// default initialize the default notifier. +queue::task_done_notify queue::priv::default_notify; + /// Default constructor of the @ref queue type. /// /// By default the queue is created with a number of worker threaders @@ -438,20 +390,17 @@ queue::task_done_notify::operator()(const task_sptr&/*task_done*/) queue::priv* worker::wait_to_execute_a_task(queue::priv* p) { - - pthread_mutex_lock(&p->queue_cond_mutex); - do { + pthread_mutex_lock(&p->tasks_todo_mutex); // If there is no more tasks to perform and the queue is not to // be brought down then wait (sleep) for new tasks to come up. - while (!p->more_tasks_to_execute() && !p->bring_workers_down) - pthread_cond_wait(&p->queue_cond, &p->queue_cond_mutex); + while (p->tasks_todo.empty() && !p->bring_workers_down) + pthread_cond_wait(&p->tasks_todo_cond, &p->tasks_todo_mutex); // We were woken up. So maybe there are tasks to perform? If // so, get a task from the queue ... task_sptr t; - pthread_mutex_lock(&p->tasks_todo_mutex); if (!p->tasks_todo.empty()) { t = p->tasks_todo.front(); @@ -477,11 +426,10 @@ worker::wait_to_execute_a_task(queue::priv* p) p->tasks_done.push_back(t); p->notify(t); pthread_mutex_unlock(&p->tasks_done_mutex); + pthread_cond_signal(&p->tasks_done_cond); } } - while (!p->bring_workers_down || p->more_tasks_to_execute()); - - pthread_mutex_unlock(&p->queue_cond_mutex); + while (!p->bring_workers_down); return p; }