Sequoia
Loading...
Searching...
No Matches
ConcurrencyModels.hpp
Go to the documentation of this file.
1
2// Copyright Oliver J. Rosten 2018. //
3// Distributed under the GNU GENERAL PUBLIC LICENSE, Version 3.0. //
4// (See accompanying file LICENSE.md or copy at //
5// https://www.gnu.org/licenses/gpl-3.0.en.html) //
7
8#pragma once
9
20
21#include <queue>
22#include <thread>
23#include <mutex>
24#include <condition_variable>
25#include <future>
26
27namespace sequoia::concurrency
28{
29
35 template<class R, class Task=std::packaged_task<R()>, class Q=std::queue<Task>>
37 {
38 public:
39 using task_t = Task;
40
41 task_queue() = default;
42 task_queue(const task_queue&) = delete;
43 task_queue(task_queue&&) = delete;
44
45 ~task_queue() = default;
46
47 task_queue& operator=(const task_queue&) = delete;
48 task_queue& operator=(task_queue&&) = delete;
49
50 void finish()
51 {
52 {
53 std::scoped_lock<std::mutex> lock{m_Mutex};
54 m_Finished = true;
55 }
56
57 m_CV.notify_all();
58 }
59
60 void push(task_t&& task)
61 {
62 {
63 std::scoped_lock<std::mutex> lock{m_Mutex};
64 m_Q.push(std::move(task));
65 }
66
67 m_CV.notify_one();
68 }
69
70 [[nodiscard]]
71 bool push(task_t&& task, std::try_to_lock_t t)
72 {
73 if(std::unique_lock<std::mutex> lock{m_Mutex, t}; lock)
74 {
75 m_Q.push(std::move(task));
76 }
77 else
78 {
79 return false;
80 }
81
82 m_CV.notify_one();
83
84 return true;
85 }
86
87 [[nodiscard]]
88 task_t pop(std::try_to_lock_t t)
89 {
90 if(std::unique_lock<std::mutex> lock{m_Mutex, t}; lock)
91 {
92 return get();
93 }
94
95 return Task{};
96 }
97
98 [[nodiscard]]
99 task_t pop()
100 {
101 std::unique_lock<std::mutex> lock{m_Mutex};
102 while(m_Q.empty() && !m_Finished) m_CV.wait(lock);
103
104 return get();
105 }
106 private:
107 Q m_Q;
108 std::mutex m_Mutex;
109 std::condition_variable m_CV;
110 bool m_Finished{};
111
112 [[nodiscard]]
113 Task get()
114 {
115 if(!m_Q.empty())
116 {
117 Task task{std::move(m_Q.front())};
118 m_Q.pop();
119 return task;
120 }
121
122 return {};
123 }
124 };
125
126 namespace impl
127 {
128 template<class R, bool MultiChannel> struct queue_details
129 {
130 using Q_t = task_queue<R>;
131 using task_t = typename Q_t::task_t;
132 using queue_type = std::vector<Q_t>;
133
134 std::size_t push_cycles{};
135 };
136
137 template<class R> struct queue_details<R, false>
138 {
139 using Q_t = task_queue<R>;
140 using task_t = typename Q_t::task_t;
141 using queue_type = Q_t;
142 };
143 }
144
145 //===================================Serial Execution Model===================================//
146
149 template<class R> class serial
150 {
151 public:
152 using return_type = R;
153
154 template<class Fn, class... Args>
155 requires std::invocable<Fn, Args...> && std::is_convertible_v<R, std::invoke_result_t<Fn, Args...>>
156 [[nodiscard]]
157 constexpr std::invoke_result_t<Fn, Args...> push(Fn&& fn, Args&&... args)
158 {
159 return std::forward<Fn>(fn)(std::forward<Args>(args)...);
160 }
161 };
162
163 //==================================Asynchronous Execution==================================//
164
167 template<class R>
169 {
170 public:
171 using return_type = R;
172
173 asynchronous() = default;
174 asynchronous(const asynchronous&) = delete;
175 asynchronous(asynchronous&&) noexcept = default;
176 ~asynchronous() = default;
177
178 asynchronous& operator=(const asynchronous&) = delete;
179 asynchronous& operator=(asynchronous&&) noexcept = default;
180
181 template<class Fn, class... Args>
182 [[nodiscard]]
183 std::future<R> push(Fn&& fn, Args&&... args)
184 {
185 return std::async(std::launch::async | std::launch::deferred, std::forward<Fn>(fn), std::forward<Args>(args)...);
186 }
187 };
188
189 //=======================================Thread Pool========================================//
190
195 template<class R, bool MultiPipeline=true>
196 class thread_pool : private impl::queue_details<R, MultiPipeline>
197 {
198 public:
199 using return_type = R;
200
201 explicit thread_pool(const std::size_t numThreads)
202 requires(!MultiPipeline)
203 {
204 make_pool(numThreads);
205 }
206
207 thread_pool(const std::size_t numThreads, const std::size_t pushCycles = 46)
208 requires MultiPipeline
210 , m_Queues(numThreads)
211 {
212 make_pool(numThreads);
213 }
214
215 thread_pool(const thread_pool&)= delete;
216 thread_pool(thread_pool&&) = delete;
217
219 {
220 if(!joined) join_all();
221 }
222
223 thread_pool& operator=(const thread_pool&) = delete;
224 thread_pool& operator=(thread_pool&&) = delete;
225
226 template<std::invocable Fn>
227 requires std::is_convertible_v<std::invoke_result_t<Fn>, R> && std::move_constructible<Fn>
228 [[nodiscard]]
229 std::future<R> push(Fn fn)
230 {
231 task_t task{std::move(fn)};
232 std::future<R> f{task.get_future()};
233
234 if constexpr(MultiPipeline)
235 {
236 const auto qIndex{m_QueueIndex++};
237 const auto N{m_Threads.size()};
238
239 if(qIndex >= N)
240 {
241 for(std::size_t i{}; i < N * this->push_cycles; ++i)
242 {
243 if(m_Queues[(qIndex + i) % N].push(std::move(task), std::try_to_lock))
244 return f;
245 }
246 }
247
248 m_Queues[qIndex % N].push(std::move(task));
249 }
250 else
251 {
252 m_Queues.push(std::move(task));
253 }
254
255 return f;
256 }
257
258 template<class Fn, class... Args>
259 requires std::invocable<Fn, Args...>
260 && std::is_convertible_v<std::invoke_result_t<Fn, Args...>, R>
261 && std::move_constructible<Fn>
262 && (std::move_constructible<Args> && ...)
263 [[nodiscard]]
264 std::future<R> push(Fn fn, Args... args)
265 {
266 return push([fn = std::move(fn), ...args = std::move(args)](){ return fn(args...); });
267 }
268
269 void join()
270 {
271 join_all();
272 joined = true;
273 }
274 private:
275 using task_t = typename impl::queue_details<R, MultiPipeline>::task_t;
276 using Queues_t = typename impl::queue_details<R, MultiPipeline>::queue_type;
277
278 Queues_t m_Queues;
279 std::vector<std::thread> m_Threads;
280 bool joined{};
281
282 std::size_t m_QueueIndex{};
283
284 void make_pool(const std::size_t numThreads)
285 {
286 m_Threads.reserve(numThreads);
287
288 for(std::size_t q{}; q<numThreads; ++q)
289 {
290 auto loop{[=,this]() {
291 if constexpr(MultiPipeline)
292 {
293 task_t task{m_Queues[q].pop()};
294 if(task.valid())
295 task();
296 else
297 return;
298 }
299
300 while(true)
301 {
302 task_t task{};
303
304 if constexpr(MultiPipeline)
305 {
306 const auto N{m_Threads.size()};
307 for(std::size_t i{}; i<N; ++i)
308 {
309 task = m_Queues[(q+i) % N].pop(std::try_to_lock);
310 if(task.valid()) break;
311 }
312
313 if(!task.valid())
314 task = m_Queues[q].pop();
315 }
316 else
317 {
318 task = m_Queues.pop();
319 }
320
321 if(task.valid())
322 task();
323 else
324 break;
325
326 }
327 }
328 };
329
330 m_Threads.emplace_back(loop);
331 }
332 }
333
334 void join_all()
335 {
336 if constexpr(MultiPipeline)
337 for(auto& q : m_Queues) q.finish();
338 else
339 m_Queues.finish();
340
341 for(auto& t : m_Threads) t.join();
342 }
343 };
344}
Traits which are sufficiently general to appear in the sequoia namespace.
Tasks may be pushed, upon which they are fed to std::async.
Definition: ConcurrencyModels.hpp:169
Tasks may be pushed, upon which they are immediately invoked.
Definition: ConcurrencyModels.hpp:150
a task queue designed for use by multiple threads.
Definition: ConcurrencyModels.hpp:37
Supports either a single pipeline or a pipeline for each thread, together with task stealing.
Definition: ConcurrencyModels.hpp:197
Definition: ConcurrencyModels.hpp:129