std::size_t currentSize = 0;
std::unordered_set<std::size_t> deletedSet;
- ThreadPool<ThreadCount> threadPool;
+ std::unique_ptr<ThreadPool<ThreadCount> > threadPool;
public:
/*!
Manager()
{
resize(EC_INIT_ENTITIES_SIZE);
+ if(ThreadCount >= 2) {
+ threadPool = std::make_unique<ThreadPool<ThreadCount> >();
+ }
}
private:
BitsetType signatureBitset =
BitsetType::template generateBitset<Signature>();
- if(!useThreadPool)
+ if(!useThreadPool || !threadPool)
{
for(std::size_t i = 0; i < currentSize; ++i)
{
std::get<2>(fnDataAr.at(i)) = &signatureBitset;
std::get<3>(fnDataAr.at(i)) = {begin, end};
std::get<4>(fnDataAr.at(i)) = userData;
- threadPool.queueFn([&function] (void *ud) {
+ threadPool->queueFn([&function] (void *ud) {
auto *data = static_cast<TPFnDataType*>(ud);
for(std::size_t i = std::get<3>(*data).at(0);
i < std::get<3>(*data).at(1);
}
}, &fnDataAr.at(i));
}
- threadPool.wakeThreads();
+ threadPool->wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting());
+ } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting());
}
}
BitsetType signatureBitset =
BitsetType::template generateBitset<Signature>();
- if(!useThreadPool)
+ if(!useThreadPool || !threadPool)
{
for(std::size_t i = 0; i < currentSize; ++i)
{
std::get<3>(fnDataAr.at(i)) = {begin, end};
std::get<4>(fnDataAr.at(i)) = userData;
std::get<5>(fnDataAr.at(i)) = function;
- threadPool.queueFn([] (void *ud) {
+ threadPool->queueFn([] (void *ud) {
auto *data = static_cast<TPFnDataType*>(ud);
for(std::size_t i = std::get<3>(*data).at(0);
i < std::get<3>(*data).at(1);
}
}, &fnDataAr.at(i));
}
- threadPool.wakeThreads();
+ threadPool->wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting());
+ } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting());
}
}
std::vector<std::size_t> matching,
void* userData)
{
- if(!useThreadPool)
+ if(!useThreadPool || !threadPool)
{
for(auto eid : matching)
{
std::get<2>(fnDataAr.at(i)) = {begin, end};
std::get<3>(fnDataAr.at(i)) = userData;
std::get<4>(fnDataAr.at(i)) = &matching;
- threadPool.queueFn([function, helper] (void* ud) {
+ threadPool->queueFn([function, helper] (void* ud) {
auto *data = static_cast<TPFnDataType*>(ud);
for(std::size_t i = std::get<2>(*data).at(0);
i < std::get<2>(*data).at(1);
}
}, &fnDataAr.at(i));
}
- threadPool.wakeThreads();
+ threadPool->wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting());
+ } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting());
}
})));
{
std::vector<std::vector<std::size_t> > matchingV(bitsets.size());
- if(!useThreadPool)
+ if(!useThreadPool || !threadPool)
{
for(std::size_t i = 0; i < currentSize; ++i)
{
std::get<3>(fnDataAr.at(i)) = &bitsets;
std::get<4>(fnDataAr.at(i)) = &entities;
std::get<5>(fnDataAr.at(i)) = &mutex;
- threadPool.queueFn([] (void *ud) {
+ threadPool->queueFn([] (void *ud) {
auto *data = static_cast<TPFnDataType*>(ud);
for(std::size_t i = std::get<1>(*data).at(0);
i < std::get<1>(*data).at(1);
}
}, &fnDataAr.at(i));
}
- threadPool.wakeThreads();
+ threadPool->wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting());
+ } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting());
}
return matchingV;
});
// find and store entities matching signatures
- if(!useThreadPool)
+ if(!useThreadPool || !threadPool)
{
for(std::size_t eid = 0; eid < currentSize; ++eid)
{
std::get<3>(fnDataAr.at(i)) = signatureBitsets;
std::get<4>(fnDataAr.at(i)) = &mutex;
- threadPool.queueFn([] (void *ud) {
+ threadPool->queueFn([] (void *ud) {
auto *data = static_cast<TPFnDataType*>(ud);
for(std::size_t i = std::get<1>(*data).at(0);
i < std::get<1>(*data).at(1);
}
}, &fnDataAr.at(i));
}
- threadPool.wakeThreads();
+ threadPool->wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting());
+ } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting());
}
// call functions on matching entities
EC::Meta::Morph<
SignatureComponents,
ForMatchingSignatureHelper<> >;
- if(!useThreadPool) {
+ if(!useThreadPool || !threadPool) {
for(const auto& id : multiMatchingEntities[index]) {
if(isAlive(id)) {
Helper::call(id, *this, func, userData);
std::get<2>(fnDataAr.at(i)) = {begin, end};
std::get<3>(fnDataAr.at(i)) = &multiMatchingEntities;
std::get<4>(fnDataAr.at(i)) = index;
- threadPool.queueFn([&func] (void *ud) {
+ threadPool->queueFn([&func] (void *ud) {
auto *data = static_cast<TPFnType*>(ud);
for(std::size_t i = std::get<2>(*data).at(0);
i < std::get<2>(*data).at(1);
}
}, &fnDataAr.at(i));
}
- threadPool.wakeThreads();
+ threadPool->wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting());
+ } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting());
}
}
);
});
// find and store entities matching signatures
- if(!useThreadPool)
+ if(!useThreadPool || !threadPool)
{
for(std::size_t eid = 0; eid < currentSize; ++eid)
{
std::get<3>(fnDataAr.at(i)) = signatureBitsets;
std::get<4>(fnDataAr.at(i)) = &mutex;
- threadPool.queueFn([] (void *ud) {
+ threadPool->queueFn([] (void *ud) {
auto *data = static_cast<TPFnDataType*>(ud);
for(std::size_t i = std::get<1>(*data).at(0);
i < std::get<1>(*data).at(1);
}
}, &fnDataAr.at(i));
}
- threadPool.wakeThreads();
+ threadPool->wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting());
+ } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting());
}
// call functions on matching entities
EC::Meta::Morph<
SignatureComponents,
ForMatchingSignatureHelper<> >;
- if(!useThreadPool)
+ if(!useThreadPool || !threadPool)
{
for(const auto& id : multiMatchingEntities[index])
{
std::get<2>(fnDataAr.at(i)) = {begin, end};
std::get<3>(fnDataAr.at(i)) = &multiMatchingEntities;
std::get<4>(fnDataAr.at(i)) = index;
- threadPool.queueFn([&func] (void *ud) {
+ threadPool->queueFn([&func] (void *ud) {
auto *data = static_cast<TPFnType*>(ud);
for(std::size_t i = std::get<2>(*data).at(0);
i < std::get<2>(*data).at(1);
}
}, &fnDataAr.at(i));
}
- threadPool.wakeThreads();
+ threadPool->wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting());
+ } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting());
}
}
);
template <typename Signature>
void forMatchingSimple(ForMatchingFn fn, void *userData = nullptr, const bool useThreadPool = false) {
const BitsetType signatureBitset = BitsetType::template generateBitset<Signature>();
- if(!useThreadPool) {
+ if(!useThreadPool || !threadPool) {
for(std::size_t i = 0; i < currentSize; ++i) {
if(!std::get<bool>(entities[i])) {
continue;
std::get<2>(fnDataAr.at(i)) = &signatureBitset;
std::get<3>(fnDataAr.at(i)) = {begin, end};
std::get<4>(fnDataAr.at(i)) = userData;
- threadPool.queueFn([&fn] (void *ud) {
+ threadPool->queueFn([&fn] (void *ud) {
auto *data = static_cast<TPFnDataType*>(ud);
for(std::size_t i = std::get<3>(*data).at(0);
i < std::get<3>(*data).at(1);
}
}, &fnDataAr.at(i));
}
- threadPool.wakeThreads();
+ threadPool->wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting());
+ } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting());
}
}
*/
template <typename Iterable>
void forMatchingIterable(Iterable iterable, ForMatchingFn fn, void* userData = nullptr, const bool useThreadPool = false) {
- if(!useThreadPool) {
+ if(!useThreadPool || !threadPool) {
bool isValid;
for(std::size_t i = 0; i < currentSize; ++i) {
if(!std::get<bool>(entities[i])) {
std::get<2>(fnDataAr.at(i)) = &iterable;
std::get<3>(fnDataAr.at(i)) = {begin, end};
std::get<4>(fnDataAr.at(i)) = userData;
- threadPool.queueFn([&fn] (void *ud) {
+ threadPool->queueFn([&fn] (void *ud) {
auto *data = static_cast<TPFnDataType*>(ud);
bool isValid;
for(std::size_t i = std::get<3>(*data).at(0);
}
}, &fnDataAr.at(i));
}
- threadPool.wakeThreads();
+ threadPool->wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting());
+ } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting());
}
}
};
using TPQueueType = std::queue<TPTupleType>;
} // namespace Internal
-template <unsigned int SIZE, typename = void>
-class ThreadPool;
-
template <unsigned int SIZE>
-class ThreadPool<SIZE, typename std::enable_if<(SIZE >= 2)>::type> {
+class ThreadPool {
public:
using THREADCOUNT = std::integral_constant<int, SIZE>;
ThreadPool() : waitCount(0) {
isAlive.store(true);
- for(unsigned int i = 0; i < SIZE; ++i) {
- threads.emplace_back([] (std::atomic_bool *isAlive,
- std::condition_variable *cv,
- std::mutex *cvMutex,
- Internal::TPQueueType *fnQueue,
- std::mutex *queueMutex,
- int *waitCount,
- std::mutex *waitCountMutex) {
- bool hasFn = false;
- Internal::TPTupleType fnTuple;
- while(isAlive->load()) {
- hasFn = false;
- {
- std::lock_guard<std::mutex> lock(*queueMutex);
- if(!fnQueue->empty()) {
- fnTuple = fnQueue->front();
- fnQueue->pop();
- hasFn = true;
+ if(SIZE >= 2) {
+ for(unsigned int i = 0; i < SIZE; ++i) {
+ threads.emplace_back([] (std::atomic_bool *isAlive,
+ std::condition_variable *cv,
+ std::mutex *cvMutex,
+ Internal::TPQueueType *fnQueue,
+ std::mutex *queueMutex,
+ int *waitCount,
+ std::mutex *waitCountMutex) {
+ bool hasFn = false;
+ Internal::TPTupleType fnTuple;
+ while(isAlive->load()) {
+ hasFn = false;
+ {
+ std::lock_guard<std::mutex> lock(*queueMutex);
+ if(!fnQueue->empty()) {
+ fnTuple = fnQueue->front();
+ fnQueue->pop();
+ hasFn = true;
+ }
+ }
+ if(hasFn) {
+ std::get<0>(fnTuple)(std::get<1>(fnTuple));
+ continue;
}
- }
- if(hasFn) {
- std::get<0>(fnTuple)(std::get<1>(fnTuple));
- continue;
- }
- {
- std::lock_guard<std::mutex> lock(*waitCountMutex);
- *waitCount += 1;
- }
- {
- std::unique_lock<std::mutex> lock(*cvMutex);
- cv->wait(lock);
- }
- {
- std::lock_guard<std::mutex> lock(*waitCountMutex);
- *waitCount -= 1;
+ {
+ std::lock_guard<std::mutex> lock(*waitCountMutex);
+ *waitCount += 1;
+ }
+ {
+ std::unique_lock<std::mutex> lock(*cvMutex);
+ cv->wait(lock);
+ }
+ {
+ std::lock_guard<std::mutex> lock(*waitCountMutex);
+ *waitCount -= 1;
+ }
}
- }
- }, &isAlive, &cv, &cvMutex, &fnQueue, &queueMutex, &waitCount, &waitCountMutex);
+ }, &isAlive, &cv, &cvMutex, &fnQueue, &queueMutex, &waitCount,
+ &waitCountMutex);
+ }
}
}
~ThreadPool() {
- isAlive.store(false);
- std::this_thread::sleep_for(std::chrono::milliseconds(200));
- cv.notify_all();
- for(auto &thread : threads) {
- thread.join();
+ if(SIZE >= 2) {
+ isAlive.store(false);
+ std::this_thread::sleep_for(std::chrono::milliseconds(20));
+ cv.notify_all();
+ for(auto &thread : threads) {
+ thread.join();
+ }
}
}
}
void wakeThreads(bool wakeAll = true) {
- if(wakeAll) {
- cv.notify_all();
+ if(SIZE >= 2) {
+ // wake threads to pull functions from queue and run them
+ if(wakeAll) {
+ cv.notify_all();
+ } else {
+ cv.notify_one();
+ }
} else {
- cv.notify_one();
+ // pull functions from queue and run them on main thread
+ Internal::TPTupleType fnTuple;
+ bool hasFn;
+ do {
+ {
+ std::lock_guard<std::mutex> lock(queueMutex);
+ if(!fnQueue.empty()) {
+ hasFn = true;
+ fnTuple = fnQueue.front();
+ fnQueue.pop();
+ } else {
+ hasFn = false;
+ }
+ }
+ if(hasFn) {
+ std::get<0>(fnTuple)(std::get<1>(fnTuple));
+ }
+ } while(hasFn);
}
}
}
bool isAllThreadsWaiting() {
- std::lock_guard<std::mutex> lock(waitCountMutex);
- return waitCount == THREADCOUNT::value;
+ if(SIZE >= 2) {
+ std::lock_guard<std::mutex> lock(waitCountMutex);
+ return waitCount == THREADCOUNT::value;
+ } else {
+ return true;
+ }
}
bool isQueueEmpty() {