group2 0.1.0
CSE 125 Group 2
Loading...
Searching...
No Matches
Parallel.hpp
Go to the documentation of this file.
1
27
28#pragma once
29
30#include <SDL3/SDL.h>
31
32#include <algorithm>
33#include <atomic>
34#include <cstddef>
35#include <cstdlib>
36#include <iterator>
37#include <utility>
38
39#if defined(GROUP2_HAVE_TBB)
40#include <tbb/parallel_for_each.h>
41#else
42#include <condition_variable>
43#include <deque>
44#include <exception>
45#include <functional>
46#include <memory>
47#include <mutex>
48#include <thread>
49#include <vector>
50#endif
51
52namespace group2::perf
53{
54
74inline std::atomic<bool> parallelEnabled{true};
75
79inline constexpr std::size_t k_parallelThreshold = 8;
80
81#if !defined(GROUP2_HAVE_TBB)
82
83inline thread_local bool inParallelKernel = false;
84
94
96{
97 const unsigned hw = std::thread::hardware_concurrency();
98 unsigned count = (hw > 1) ? (hw - 1) : 0;
99
100 const char* p = std::getenv("GROUP2_SERVER_THREADS");
101 if (p == nullptr || p[0] == '\0')
102 return count;
103
104 char* end = nullptr;
105 const unsigned long requested = std::strtoul(p, &end, 10);
106 if (end == p)
107 return count;
108
109 constexpr unsigned k_maxFallbackWorkers = 64;
110 return static_cast<unsigned>(std::min<unsigned long>(requested, k_maxFallbackWorkers));
111}
112
114{
115public:
117 {
118 const unsigned count = fallbackWorkerCountFromEnv();
119 workers_.reserve(count);
120 for (unsigned i = 0; i < count; ++i) {
121 workers_.emplace_back([this] { workerLoop(); });
122 }
123 }
124
126 {
127 {
128 std::lock_guard<std::mutex> lock(mutex_);
129 stopping_ = true;
130 }
131 cv_.notify_all();
132 for (std::thread& worker : workers_) {
133 if (worker.joinable())
134 worker.join();
135 }
136 }
137
140
141 std::size_t workerCount() const noexcept { return workers_.size(); }
142
143 template <class Job>
144 void enqueue(Job&& job)
145 {
146 {
147 std::lock_guard<std::mutex> lock(mutex_);
148 jobs_.emplace_back(std::forward<Job>(job));
149 }
150 cv_.notify_one();
151 }
152
153private:
155 {
156 for (;;) {
157 std::function<void()> job;
158 {
159 std::unique_lock<std::mutex> lock(mutex_);
160 cv_.wait(lock, [this] { return stopping_ || !jobs_.empty(); });
161 if (stopping_ && jobs_.empty())
162 return;
163 job = std::move(jobs_.front());
164 jobs_.pop_front();
165 }
166 job();
167 }
168 }
169
170 std::vector<std::thread> workers_;
171 std::deque<std::function<void()>> jobs_;
172 std::mutex mutex_;
173 std::condition_variable cv_;
174 bool stopping_ = false;
175};
176
178{
179 static FallbackThreadPool pool;
180 return pool;
181}
182
183#endif
184
191{
192 const char* p = std::getenv("GROUP2_SERVER_PARALLEL");
193 const bool wantOff = p != nullptr && (p[0] == '0' || p[0] == 'f' || p[0] == 'F' || p[0] == 'n' || p[0] == 'N');
194 const bool wantOn = !wantOff;
195 parallelEnabled.store(wantOn, std::memory_order_release);
196#if defined(GROUP2_HAVE_TBB)
197 SDL_Log("[perf] parallel kernels: %s (TBB-backed; default ON, set GROUP2_SERVER_PARALLEL=0 to disable)",
198 wantOn ? "ENABLED" : "disabled");
199#else
200 if (wantOn) {
201 const std::size_t workers = fallbackPool().workerCount();
202 SDL_Log(
203 "[perf] parallel kernels: %s (std::thread fallback, %zu workers; set GROUP2_SERVER_PARALLEL=0 to disable)",
204 workers > 0 ? "ENABLED" : "sequential",
205 workers);
206 } else {
207 SDL_Log("[perf] parallel kernels: disabled (std::thread fallback available)");
208 }
209#endif
210}
211
216template <class Iter, class Fn>
217inline void parallelFor(Iter begin, Iter end, Fn&& fn)
218{
219#if defined(GROUP2_HAVE_TBB)
220 const auto distance = std::distance(begin, end);
221 if (parallelEnabled.load(std::memory_order_relaxed) && static_cast<std::size_t>(distance) >= k_parallelThreshold) {
222 tbb::parallel_for_each(begin, end, std::forward<Fn>(fn));
223 return;
224 }
225#else
226 const auto distance = std::distance(begin, end);
227 if (distance <= 0)
228 return;
229
230 const auto count = static_cast<std::size_t>(distance);
231 if (parallelEnabled.load(std::memory_order_relaxed) && count >= k_parallelThreshold && !inParallelKernel) {
233 const std::size_t workers = pool.workerCount();
234 if (workers > 0) {
235 const std::size_t chunks = std::min<std::size_t>(count, workers + 1);
236 if (chunks > 1) {
237 struct WaitState
238 {
239 std::mutex mutex;
240 std::condition_variable cv;
241 std::size_t remaining = 0;
242 std::exception_ptr exception;
243 };
244
245 auto state = std::make_shared<WaitState>();
246 state->remaining = chunks - 1;
247
248 auto runChunk = [&](std::size_t chunk) {
249 const std::size_t first = (chunk * count) / chunks;
250 const std::size_t last = ((chunk + 1) * count) / chunks;
251 auto it = begin;
252 std::advance(it, static_cast<decltype(distance)>(first));
253 auto chunkEnd = begin;
254 std::advance(chunkEnd, static_cast<decltype(distance)>(last));
255 for (; it != chunkEnd; ++it) {
256 fn(*it);
257 }
258 };
259
260 auto finish = [state](std::exception_ptr ex) {
261 std::lock_guard<std::mutex> lock(state->mutex);
262 if (ex && !state->exception)
263 state->exception = ex;
264 if (--state->remaining == 0)
265 state->cv.notify_one();
266 };
267
268 for (std::size_t chunk = 1; chunk < chunks; ++chunk) {
269 pool.enqueue([&, finish, chunk] {
270 std::exception_ptr ex;
271 try {
272 ParallelFlagGuard guard;
273 runChunk(chunk);
274 } catch (...) {
275 ex = std::current_exception();
276 }
277 finish(ex);
278 });
279 }
280
281 std::exception_ptr mainException;
282 try {
283 ParallelFlagGuard guard;
284 runChunk(0);
285 } catch (...) {
286 mainException = std::current_exception();
287 }
288
289 {
290 std::unique_lock<std::mutex> lock(state->mutex);
291 state->cv.wait(lock, [state] { return state->remaining == 0; });
292 }
293
294 if (mainException)
295 std::rethrow_exception(mainException);
296 if (state->exception)
297 std::rethrow_exception(state->exception);
298 return;
299 }
300 }
301 }
302#endif
303 std::for_each(begin, end, std::forward<Fn>(fn));
304}
305
306} // namespace group2::perf
Definition Parallel.hpp:114
void workerLoop()
Definition Parallel.hpp:154
FallbackThreadPool & operator=(const FallbackThreadPool &)=delete
std::size_t workerCount() const noexcept
Definition Parallel.hpp:141
void enqueue(Job &&job)
Definition Parallel.hpp:144
FallbackThreadPool()
Definition Parallel.hpp:116
std::deque< std::function< void()> > jobs_
Definition Parallel.hpp:171
std::mutex mutex_
Definition Parallel.hpp:172
std::vector< std::thread > workers_
Definition Parallel.hpp:170
bool stopping_
Definition Parallel.hpp:174
FallbackThreadPool(const FallbackThreadPool &)=delete
~FallbackThreadPool()
Definition Parallel.hpp:125
std::condition_variable cv_
Definition Parallel.hpp:173
Definition Parallel.hpp:86
bool previous_
Definition Parallel.hpp:92
~ParallelFlagGuard()
Definition Parallel.hpp:89
ParallelFlagGuard()
Definition Parallel.hpp:88
Definition Parallel.hpp:53
void parallelFor(Iter begin, Iter end, Fn &&fn)
Call fn(*it) for every element in [begin, end).
Definition Parallel.hpp:217
std::atomic< bool > parallelEnabled
Master switch for parallel execution.
Definition Parallel.hpp:74
bool inParallelKernel
Definition Parallel.hpp:83
void initParallelFromEnv()
Initialize from environment.
Definition Parallel.hpp:190
unsigned fallbackWorkerCountFromEnv()
Definition Parallel.hpp:95
constexpr std::size_t k_parallelThreshold
Minimum items below which parallelFor runs sequentially even when the master switch is on.
Definition Parallel.hpp:79
FallbackThreadPool & fallbackPool()
Definition Parallel.hpp:177