diff --git a/gdb/unittests/parallel-for-selftests.c b/gdb/unittests/parallel-for-selftests.c index 54556c9861c..b69eed43d5c 100644 --- a/gdb/unittests/parallel-for-selftests.c +++ b/gdb/unittests/parallel-for-selftests.c @@ -120,12 +120,33 @@ test_parallel_for_each () const std::vector for_each_functions { + /* Test gdb::parallel_for_each. */ [] (int *start, int *end, foreach_callback_t callback) - { gdb::parallel_for_each<1, int *, test_worker> (start, end, callback, - 0); }, + { gdb::parallel_for_each<1, int *, test_worker> (start, end, callback, 0); }, + + /* Test gdb::parallel_for_each_async. */ [] (int *start, int *end, foreach_callback_t callback) - { gdb::sequential_for_each (start, end, callback, - 0); }, + { + bool done_flag = false; + std::condition_variable cv; + std::mutex mtx; + + gdb::parallel_for_each_async<1, int *, test_worker> (start, end, + [&mtx, &done_flag, &cv] () + { + std::lock_guard lock (mtx); + done_flag = true; + cv.notify_one(); + }, callback, 0); + + /* Wait for the async parallel-for to complete. */ + std::unique_lock lock (mtx); + cv.wait (lock, [&done_flag] () { return done_flag; }); + }, + + /* Test gdb::sequential_for_each. */ + [] (int *start, int *end, foreach_callback_t callback) + { gdb::sequential_for_each (start, end, callback, 0); }, }; int default_thread_count = gdb::thread_pool::g_thread_pool->thread_count (); diff --git a/gdbsupport/parallel-for.h b/gdbsupport/parallel-for.h index 7baa768c047..7f3fef96c9c 100644 --- a/gdbsupport/parallel-for.h +++ b/gdbsupport/parallel-for.h @@ -30,6 +30,10 @@ namespace gdb { +/* If enabled, print debug info about the inner workings of the parallel for + each functions. */ +constexpr bool parallel_for_each_debug = false; + /* A "parallel-for" implementation using a shared work queue. Work items get popped in batches of size up to BATCH_SIZE from the queue and handed out to worker threads. @@ -39,7 +43,10 @@ namespace gdb thread state. Worker threads call Worker::operator() repeatedly until the queue is - empty. */ + empty. + + This function is synchronous, meaning that it blocks and returns once the + processing is complete. */ template @@ -47,10 +54,6 @@ void parallel_for_each (const RandomIt first, const RandomIt last, WorkerArgs &&...worker_args) { - /* If enabled, print debug info about how the work is distributed across - the threads. */ - const bool parallel_for_each_debug = false; - gdb_assert (first <= last); if (parallel_for_each_debug) @@ -122,6 +125,134 @@ sequential_for_each (RandomIt first, RandomIt last, WorkerArgs &&...worker_args) Worker (std::forward (worker_args)...) ({ first, last }); } +namespace detail +{ + +/* Type to hold the state shared between threads of + gdb::parallel_for_each_async. */ + +template +struct pfea_state +{ + pfea_state (RandomIt first, RandomIt last, std::function &&done, + WorkerArgs &&...worker_args) + : first (first), + last (last), + worker_args_tuple (std::forward_as_tuple + (std::forward (worker_args)...)), + queue (first, last), + m_done (std::move (done)) + {} + + DISABLE_COPY_AND_ASSIGN (pfea_state); + + /* This gets called by the last worker thread that drops its reference on + the shared state, thus when the processing is complete. */ + ~pfea_state () + { + if (m_done) + m_done (); + } + + /* The interval to process. */ + const RandomIt first, last; + + /* Tuple of arguments to pass when constructing the user's worker object. + + Use std::decay_t to avoid storing references to the caller's local + variables. If we didn't use it and the caller passed an lvalue `foo *`, + we would store it as a reference to `foo *`, thus storing a reference to + the caller's local variable. + + The downside is that it's not possible to pass arguments by reference, + callers need to pass pointers or std::reference_wrappers. */ + std::tuple...> worker_args_tuple; + + /* Work queue that worker threads pull work items from. */ + work_queue queue; + +private: + /* Callable called when the parallel-for is done. */ + std::function m_done; +}; + +} /* namespace detail */ + +/* A "parallel-for" implementation using a shared work queue. Work items get + popped in batches from the queue and handed out to worker threads. + + Batch sizes are proportional to the number of remaining items in the queue, + but always greater or equal to MIN_BATCH_SIZE. + + The DONE callback is invoked when processing is done. + + Each worker thread instantiates an object of type Worker, forwarding ARGS to + its constructor. The Worker object can be used to keep some per-worker + thread state. This version does not support passing references as arguments + to the worker. Use std::reference_wrapper or pointers instead. + + Worker threads call Worker::operator() repeatedly until the queue is + empty. + + This function is asynchronous. An arbitrary worker thread will call the DONE + callback when processing is done. */ + +template +void +parallel_for_each_async (const RandomIt first, const RandomIt last, + std::function &&done, + WorkerArgs &&...worker_args) +{ + gdb_assert (first <= last); + + if (parallel_for_each_debug) + { + debug_printf ("Parallel for: n elements: %zu\n", + static_cast (last - first)); + debug_printf ("Parallel for: min batch size: %zu\n", min_batch_size); + } + + const size_t n_worker_threads + = std::max (thread_pool::g_thread_pool->thread_count (), 1); + + /* The state shared between all worker threads. All worker threads get a + reference on the shared pointer through the lambda below. The last worker + thread to drop its reference will cause this object to be destroyed, which + will call the DONE callback. */ + using state_t = detail::pfea_state; + auto state + = std::make_shared (first, last, std::move (done), + std::forward (worker_args)...); + + /* The worker thread task. */ + auto task = [state] () + { + /* Instantiate the user-defined worker. */ + auto worker = std::make_from_tuple (state->worker_args_tuple); + + for (;;) + { + const auto batch = state->queue.pop_batch (); + + if (batch.empty ()) + break; + + if (parallel_for_each_debug) + debug_printf ("Processing %zu items, range [%zu, %zu[\n", + batch.size (), + batch.begin () - state->first, + batch.end () - state->first); + + worker (batch); + } + }; + + /* Start N_WORKER_THREADS tasks. */ + for (int i = 0; i < n_worker_threads; ++i) + gdb::thread_pool::g_thread_pool->post_task (task); +} + } #endif /* GDBSUPPORT_PARALLEL_FOR_H */