gdbsupport: factor out work queue from parallel-for.h

In preparation for a following patch that will re-use the shared work
queue algorithm, move it to a separate class.

Change-Id: Id05cf8898a5d162048fa8fa056fbf7e0441bfb78
Approved-By: Tom Tromey <tom@tromey.com>
This commit is contained in:
Simon Marchi
2025-09-19 16:27:03 -04:00
parent 0f5b90c2dc
commit 20e3867ad8
2 changed files with 108 additions and 32 deletions

View File

@@ -25,6 +25,7 @@
#include <tuple>
#include "gdbsupport/iterator-range.h"
#include "gdbsupport/thread-pool.h"
#include "gdbsupport/work-queue.h"
namespace gdb
{
@@ -59,12 +60,8 @@ parallel_for_each (const RandomIt first, const RandomIt last,
debug_printf ("Parallel for: batch size: %zu\n", batch_size);
}
const size_t n_worker_threads
= std::max<size_t> (thread_pool::g_thread_pool->thread_count (), 1);
std::vector<gdb::future<void>> results;
/* The next item to hand out. */
std::atomic<RandomIt> next = first;
work_queue<RandomIt, batch_size> queue (first, last);
/* The worker thread task.
@@ -77,49 +74,32 @@ parallel_for_each (const RandomIt first, const RandomIt last,
and `args` can be used as-is in the lambda. */
auto args_tuple
= std::forward_as_tuple (std::forward<WorkerArgs> (worker_args)...);
auto task = [&next, first, last, n_worker_threads, &args_tuple] ()
auto task = [&queue, first, &args_tuple] ()
{
/* Instantiate the user-defined worker. */
auto worker = std::make_from_tuple<Worker> (args_tuple);
for (;;)
{
/* Grab a snapshot of NEXT. */
auto local_next = next.load ();
gdb_assert (local_next <= last);
const auto batch = queue.pop_batch ();
/* Number of remaining items. */
auto n_remaining = last - local_next;
gdb_assert (n_remaining >= 0);
/* Are we done? */
if (n_remaining == 0)
if (batch.empty ())
break;
const auto this_batch_size
= std::min<std::size_t> (batch_size, n_remaining);
/* The range to process in this iteration. */
const auto this_batch_first = local_next;
const auto this_batch_last = local_next + this_batch_size;
/* Update NEXT. If the current value of NEXT doesn't match
LOCAL_NEXT, it means another thread updated it concurrently,
restart. */
if (!next.compare_exchange_weak (local_next, this_batch_last))
continue;
if (parallel_for_each_debug)
debug_printf ("Processing %zu items, range [%zu, %zu[\n",
this_batch_size,
static_cast<size_t> (this_batch_first - first),
static_cast<size_t> (this_batch_last - first));
batch.size (),
batch.begin () - first,
batch.end () - first);
worker ({ this_batch_first, this_batch_last });
worker (batch);
}
};
/* Start N_WORKER_THREADS tasks. */
const size_t n_worker_threads
= std::max<size_t> (thread_pool::g_thread_pool->thread_count (), 1);
for (int i = 0; i < n_worker_threads; ++i)
results.push_back (gdb::thread_pool::g_thread_pool->post_task (task));

96
gdbsupport/work-queue.h Normal file
View File

@@ -0,0 +1,96 @@
/* Synchronized work queue.
Copyright (C) 2025 Free Software Foundation, Inc.
This file is part of GDB.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>. */
#ifndef GDBSUPPORT_WORK_QUEUE_H
#define GDBSUPPORT_WORK_QUEUE_H
#include "gdbsupport/iterator-range.h"
namespace gdb
{
/* Implementation of a thread-safe work queue.
The work items are specified by two iterators of type RandomIt.
BATCH_SIZE is the number of work items to pop in a batch. */
template<typename RandomIt, std::size_t batch_size>
class work_queue
{
public:
/* The work items are specified by the range `[first, last[`. */
work_queue (const RandomIt first, const RandomIt last) noexcept
: m_next (first),
m_last (last)
{
gdb_assert (first <= last);
}
DISABLE_COPY_AND_ASSIGN (work_queue);
/* Pop a batch of work items.
The return value is an iterator range delimiting the work items. */
iterator_range<RandomIt> pop_batch () noexcept
{
for (;;)
{
/* Grab a snapshot of M_NEXT. */
auto next = m_next.load ();
gdb_assert (next <= m_last);
/* The number of items remaining in the queue. */
const auto n_remaining = static_cast<std::size_t> (m_last - next);
/* Are we done? */
if (n_remaining == 0)
return { m_last, m_last };
/* The batch size is proportional to the number of items remaining in
the queue. We do this to try to stike a balance, avoiding
synchronization overhead when there are many items to process at the
start, and avoiding workload imbalance when there are few items to
process at the end. */
const auto this_batch_size = std::min (batch_size, n_remaining);
/* The range of items in this batch. */
const auto this_batch_first = next;
const auto this_batch_last = next + this_batch_size;
/* Update M_NEXT. If the current value of M_NEXT doesn't match NEXT, it
means another thread updated it concurrently, restart. */
if (!m_next.compare_exchange_weak (next, this_batch_last))
continue;
return { this_batch_first, this_batch_last };
}
}
private:
/* The next work item to hand out. */
std::atomic<RandomIt> m_next;
/* The end of the work item range. */
RandomIt m_last;
};
} /* namespace gdb */
#endif /* GDBSUPPORT_WORK_QUEUE_H */