24#include <condition_variable>
27namespace sequoia::concurrency
35 template<
class R,
class Task=std::packaged_task<R()>,
class Q=std::queue<Task>>
53 std::scoped_lock<std::mutex> lock{m_Mutex};
60 void push(task_t&& task)
63 std::scoped_lock<std::mutex> lock{m_Mutex};
64 m_Q.push(std::move(task));
71 bool push(task_t&& task, std::try_to_lock_t t)
73 if(std::unique_lock<std::mutex> lock{m_Mutex, t}; lock)
75 m_Q.push(std::move(task));
88 task_t pop(std::try_to_lock_t t)
90 if(std::unique_lock<std::mutex> lock{m_Mutex, t}; lock)
101 std::unique_lock<std::mutex> lock{m_Mutex};
102 while(m_Q.empty() && !m_Finished) m_CV.wait(lock);
109 std::condition_variable m_CV;
117 Task task{std::move(m_Q.front())};
131 using task_t =
typename Q_t::task_t;
132 using queue_type = std::vector<Q_t>;
134 std::size_t push_cycles{};
140 using task_t =
typename Q_t::task_t;
152 using return_type = R;
154 template<
class Fn,
class... Args>
155 requires std::invocable<Fn, Args...> && std::is_convertible_v<R, std::invoke_result_t<Fn, Args...>>
157 constexpr std::invoke_result_t<Fn, Args...> push(Fn&& fn, Args&&... args)
159 return std::forward<Fn>(fn)(std::forward<Args>(args)...);
171 using return_type = R;
181 template<
class Fn,
class... Args>
183 std::future<R> push(Fn&& fn, Args&&... args)
185 return std::async(std::launch::async | std::launch::deferred, std::forward<Fn>(fn), std::forward<Args>(args)...);
195 template<
class R,
bool MultiPipeline=true>
199 using return_type = R;
202 requires(!MultiPipeline)
204 make_pool(numThreads);
207 thread_pool(
const std::size_t numThreads,
const std::size_t pushCycles = 46)
208 requires MultiPipeline
210 , m_Queues(numThreads)
212 make_pool(numThreads);
220 if(!joined) join_all();
226 template<std::invocable Fn>
227 requires std::is_convertible_v<std::invoke_result_t<Fn>, R> && std::move_constructible<Fn>
229 std::future<R> push(Fn fn)
231 task_t task{std::move(fn)};
232 std::future<R> f{task.get_future()};
234 if constexpr(MultiPipeline)
236 const auto qIndex{m_QueueIndex++};
237 const auto N{m_Threads.size()};
241 for(std::size_t i{}; i < N * this->push_cycles; ++i)
243 if(m_Queues[(qIndex + i) % N].push(std::move(task), std::try_to_lock))
248 m_Queues[qIndex % N].push(std::move(task));
252 m_Queues.push(std::move(task));
258 template<
class Fn,
class... Args>
259 requires std::invocable<Fn, Args...>
260 && std::is_convertible_v<std::invoke_result_t<Fn, Args...>, R>
261 && std::move_constructible<Fn>
262 && (std::move_constructible<Args> && ...)
264 std::future<R> push(Fn fn, Args... args)
266 return push([fn = std::move(fn), ...args = std::move(args)](){
return fn(args...); });
275 using task_t =
typename impl::queue_details<R, MultiPipeline>::task_t;
276 using Queues_t =
typename impl::queue_details<R, MultiPipeline>::queue_type;
279 std::vector<std::thread> m_Threads;
282 std::size_t m_QueueIndex{};
284 void make_pool(
const std::size_t numThreads)
286 m_Threads.reserve(numThreads);
288 for(std::size_t q{}; q<numThreads; ++q)
290 auto loop{[=,
this]() {
291 if constexpr(MultiPipeline)
293 task_t task{m_Queues[q].pop()};
304 if constexpr(MultiPipeline)
306 const auto N{m_Threads.size()};
307 for(std::size_t i{}; i<N; ++i)
309 task = m_Queues[(q+i) % N].pop(std::try_to_lock);
310 if(task.valid())
break;
314 task = m_Queues[q].pop();
318 task = m_Queues.pop();
330 m_Threads.emplace_back(loop);
336 if constexpr(MultiPipeline)
337 for(
auto& q : m_Queues) q.finish();
341 for(
auto& t : m_Threads) t.join();
Traits which are sufficiently general to appear in the sequoia namespace.
Tasks may be pushed, upon which they are fed to std::async.
Definition: ConcurrencyModels.hpp:169
Tasks may be pushed, upon which they are immediately invoked.
Definition: ConcurrencyModels.hpp:150
a task queue designed for use by multiple threads.
Definition: ConcurrencyModels.hpp:37
Supports either a single pipeline or a pipeline for each thread, together with task stealing.
Definition: ConcurrencyModels.hpp:197
Definition: ConcurrencyModels.hpp:129