Rewrite workqueue. This version eliminates the master thread, and

reduces the amount of locking required to find a new thread to run.
This commit is contained in:
Ian Lance Taylor
2007-12-14 19:00:21 +00:00
parent 7004837e8d
commit 17a1d0a9b2
33 changed files with 1491 additions and 1433 deletions

View File

@@ -44,7 +44,7 @@ namespace gold
class Workqueue_thread
{
public:
Workqueue_thread(Workqueue_runner_threadpool*);
Workqueue_thread(Workqueue_threader_threadpool*, int thread_number);
~Workqueue_thread();
@@ -62,20 +62,19 @@ class Workqueue_thread
static void*
thread_body(void*);
// The main loop of the thread.
void
run();
// A pointer to the threadpool that this thread is part of.
Workqueue_runner_threadpool* threadpool_;
Workqueue_threader_threadpool* threadpool_;
// The thread number.
int thread_number_;
// The thread ID.
pthread_t tid_;
};
// Create the thread in the constructor.
Workqueue_thread::Workqueue_thread(Workqueue_runner_threadpool* threadpool)
: threadpool_(threadpool)
Workqueue_thread::Workqueue_thread(Workqueue_threader_threadpool* threadpool,
int thread_number)
: threadpool_(threadpool), thread_number_(thread_number)
{
pthread_attr_t attr;
int err = pthread_attr_init(&attr);
@@ -114,7 +113,8 @@ void*
Workqueue_thread::thread_body(void* arg)
{
Workqueue_thread* pwt = reinterpret_cast<Workqueue_thread*>(arg);
pwt->run();
pwt->threadpool_->process(pwt->thread_number_);
// Delete the thread object as we exit.
delete pwt;
@@ -122,144 +122,75 @@ Workqueue_thread::thread_body(void* arg)
return NULL;
}
// This is the main loop of a worker thread. It picks up a new Task
// and runs it.
void
Workqueue_thread::run()
{
Workqueue_runner_threadpool* threadpool = this->threadpool_;
Workqueue* workqueue = threadpool->get_workqueue();
while (true)
{
Task* t;
Task_locker* tl;
if (!threadpool->get_next(&t, &tl))
return;
gold_debug(DEBUG_TASK, "running task %s", t->name().c_str());
t->run(workqueue);
threadpool->thread_completed(t, tl);
}
}
// Class Workqueue_runner_threadpool.
// Class Workqueue_threader_threadpool.
// Constructor.
Workqueue_runner_threadpool::Workqueue_runner_threadpool(Workqueue* workqueue)
: Workqueue_runner(workqueue),
desired_thread_count_(0),
Workqueue_threader_threadpool::Workqueue_threader_threadpool(
Workqueue* workqueue)
: Workqueue_threader(workqueue),
check_thread_count_(0),
lock_(),
actual_thread_count_(0),
running_thread_count_(0),
task_queue_(),
task_queue_condvar_(this->lock_)
desired_thread_count_(1),
threads_(1)
{
}
// Destructor.
Workqueue_runner_threadpool::~Workqueue_runner_threadpool()
Workqueue_threader_threadpool::~Workqueue_threader_threadpool()
{
// Tell the threads to exit.
Hold_lock hl(this->lock_);
this->desired_thread_count_ = 0;
this->task_queue_condvar_.broadcast();
this->get_workqueue()->set_thread_count(0);
}
// Run a task. This doesn't actually run the task: it pushes it on
// the queue of tasks to run. This is always called in the main
// thread.
// Set the thread count.
void
Workqueue_runner_threadpool::run(Task* t, Task_locker* tl)
{
Hold_lock hl(this->lock_);
// This is where we create threads as needed, subject to the limit
// of the desired thread count.
gold_assert(this->desired_thread_count_ > 0);
gold_assert(this->actual_thread_count_ >= this->running_thread_count_);
if (this->actual_thread_count_ == this->running_thread_count_
&& this->actual_thread_count_ < this->desired_thread_count_)
{
// Note that threads delete themselves when they exit, so we
// don't keep pointers to them.
new Workqueue_thread(this);
++this->actual_thread_count_;
}
this->task_queue_.push(std::make_pair(t, tl));
this->task_queue_condvar_.signal();
}
// Set the thread count. This is only called in the main thread, and
// is only called when there are no threads running.
void
Workqueue_runner_threadpool::set_thread_count(int thread_count)
{
gold_assert(this->running_thread_count_ <= 1);
gold_assert(thread_count > 0);
this->desired_thread_count_ = thread_count;
}
// Get the next task to run. This is always called by an instance of
// Workqueue_thread, and is never called in the main thread. It
// returns false if the calling thread should exit.
bool
Workqueue_runner_threadpool::get_next(Task** pt, Task_locker** ptl)
{
Hold_lock hl(this->lock_);
// This is where we destroy threads, by telling them to exit.
gold_assert(this->actual_thread_count_ > this->running_thread_count_);
if (this->actual_thread_count_ > this->desired_thread_count_)
{
--this->actual_thread_count_;
return false;
}
while (this->task_queue_.empty() && this->desired_thread_count_ > 0)
{
// Wait for a new task to become available.
this->task_queue_condvar_.wait();
}
// Check whether we are exiting.
if (this->desired_thread_count_ == 0)
{
gold_assert(this->actual_thread_count_ > 0);
--this->actual_thread_count_;
return false;
}
*pt = this->task_queue_.front().first;
*ptl = this->task_queue_.front().second;
this->task_queue_.pop();
++this->running_thread_count_;
return true;
}
// This is called when a thread completes its task.
void
Workqueue_runner_threadpool::thread_completed(Task* t, Task_locker* tl)
Workqueue_threader_threadpool::set_thread_count(int thread_count)
{
int create;
{
Hold_lock hl(this->lock_);
gold_assert(this->actual_thread_count_ > 0);
gold_assert(this->running_thread_count_ > 0);
--this->running_thread_count_;
this->desired_thread_count_ = thread_count;
create = this->desired_thread_count_ - this->threads_;
if (create < 0)
this->check_thread_count_ = 1;
}
this->completed(t, tl);
if (create > 0)
{
for (int i = 0; i < create; ++i)
{
// Note that threads delete themselves when they exit, so we
// don't keep pointers to them.
new Workqueue_thread(this, this->threads_);
++this->threads_;
}
}
}
// Return whether the current thread should be cancelled.
bool
Workqueue_threader_threadpool::should_cancel_thread()
{
// Fast exit without taking a lock.
if (!this->check_thread_count_)
return false;
{
Hold_lock hl(this->lock_);
if (this->threads_ > this->desired_thread_count_)
{
--this->threads_;
return true;
}
this->check_thread_count_ = 0;
}
return false;
}
} // End namespace gold.