std::array<std::size_t, 2> range;
Manager *manager;
EntitiesType *entities;
- const BitsetType *signature;
+ BitsetType signature;
void *userData;
std::unordered_set<std::size_t> dead;
};
}
else
{
- std::array<TPFnDataStructZero, ThreadCount * 2> fnDataAr;
+ std::array<TPFnDataStructZero*, ThreadCount * 2> fnDataAr;
std::size_t s = currentSize / (ThreadCount * 2);
for(std::size_t i = 0; i < ThreadCount * 2; ++i) {
if(begin == end) {
continue;
}
- fnDataAr[i].range = {begin, end};
- fnDataAr[i].manager = this;
- fnDataAr[i].entities = &entities;
- fnDataAr[i].signature = &signatureBitset;
- fnDataAr[i].userData = userData;
+ fnDataAr[i] = new TPFnDataStructZero{};
+ fnDataAr[i]->range = {begin, end};
+ fnDataAr[i]->manager = this;
+ fnDataAr[i]->entities = &entities;
+ fnDataAr[i]->signature = signatureBitset;
+ fnDataAr[i]->userData = userData;
for(std::size_t j = begin; j < end; ++j) {
if(!isAlive(j)) {
- fnDataAr[i].dead.insert(j);
+ fnDataAr[i]->dead.insert(j);
}
}
continue;
}
- if(((*data->signature)
+ if(((data->signature)
& std::get<BitsetType>(
data->entities->at(i)))
- == *data->signature) {
+ == data->signature) {
Helper::call(i,
*data->manager,
std::forward<Function>(function),
data->userData);
}
}
- }, &fnDataAr[i]);
+ delete data;
+ }, fnDataAr[i]);
}
threadPool->easyWakeAndWait();
}
may not have as great of a speed-up.
*/
template <typename Signature>
- void forMatchingSimple(ForMatchingFn fn,
+ void forMatchingSimple(ForMatchingFn fn,
void *userData = nullptr,
const bool useThreadPool = false) {
deferringDeletions.fetch_add(1);
}
}
} else {
- std::array<TPFnDataStructZero, ThreadCount * 2> fnDataAr;
+ std::array<TPFnDataStructZero*, ThreadCount * 2> fnDataAr;
std::size_t s = currentSize / (ThreadCount * 2);
for(std::size_t i = 0; i < ThreadCount * 2; ++i) {
if(begin == end) {
continue;
}
- fnDataAr[i].range = {begin, end};
- fnDataAr[i].manager = this;
- fnDataAr[i].entities = &entities;
- fnDataAr[i].signature = &signatureBitset;
- fnDataAr[i].userData = userData;
+ fnDataAr[i] = new TPFnDataStructZero{};
+ fnDataAr[i]->range = {begin, end};
+ fnDataAr[i]->manager = this;
+ fnDataAr[i]->entities = &entities;
+ fnDataAr[i]->signature = signatureBitset;
+ fnDataAr[i]->userData = userData;
for(std::size_t j = begin; j < end; ++j) {
if(!isAlive(j)) {
- fnDataAr[i].dead.insert(j);
+ fnDataAr[i]->dead.insert(j);
}
}
threadPool->queueFn([&fn] (void *ud) {
++i) {
if(data->dead.find(i) != data->dead.end()) {
continue;
- } else if((*data->signature
+ } else if((data->signature
& std::get<BitsetType>(
data->entities->at(i)))
- == *data->signature) {
+ == data->signature) {
fn(i, data->manager, data->userData);
}
}
- }, &fnDataAr[i]);
+ delete data;
+ }, fnDataAr[i]);
}
threadPool->easyWakeAndWait();
}
#include <functional>
#include <tuple>
#include <chrono>
+#include <unordered_set>
+
+#ifndef NDEBUG
+# include <iostream>
+#endif
namespace EC {
using TPFnType = std::function<void(void*)>;
using TPTupleType = std::tuple<TPFnType, void*>;
using TPQueueType = std::queue<TPTupleType>;
+
+ template <unsigned int SIZE>
+ void thread_fn(std::atomic_bool *isAlive,
+ std::condition_variable *cv,
+ std::mutex *cvMutex,
+ Internal::TPQueueType *fnQueue,
+ std::mutex *queueMutex,
+ std::atomic_int *waitCount) {
+ bool hasFn = false;
+ Internal::TPTupleType fnTuple;
+ while(isAlive->load()) {
+ hasFn = false;
+ {
+ std::lock_guard<std::mutex> lock(*queueMutex);
+ if(!fnQueue->empty()) {
+ fnTuple = fnQueue->front();
+ fnQueue->pop();
+ hasFn = true;
+ }
+ }
+ if(hasFn) {
+ std::get<0>(fnTuple)(std::get<1>(fnTuple));
+ continue;
+ }
+
+ waitCount->fetch_add(1);
+ {
+ std::unique_lock<std::mutex> lock(*cvMutex);
+ cv->wait(lock);
+ }
+ waitCount->fetch_sub(1);
+ }
+ }
} // namespace Internal
/*!
template <unsigned int SIZE>
class ThreadPool {
public:
- ThreadPool() : waitCount(0) {
+ ThreadPool() {
+ waitCount.store(0);
+ extraThreadCount.store(0);
isAlive.store(true);
if(SIZE >= 2) {
for(unsigned int i = 0; i < SIZE; ++i) {
- threads.emplace_back([] (std::atomic_bool *isAlive,
- std::condition_variable *cv,
- std::mutex *cvMutex,
- Internal::TPQueueType *fnQueue,
- std::mutex *queueMutex,
- int *waitCount,
- std::mutex *waitCountMutex) {
- bool hasFn = false;
- Internal::TPTupleType fnTuple;
- while(isAlive->load()) {
- hasFn = false;
- {
- std::lock_guard<std::mutex> lock(*queueMutex);
- if(!fnQueue->empty()) {
- fnTuple = fnQueue->front();
- fnQueue->pop();
- hasFn = true;
- }
- }
- if(hasFn) {
- std::get<0>(fnTuple)(std::get<1>(fnTuple));
- continue;
- }
-
- {
- std::lock_guard<std::mutex> lock(*waitCountMutex);
- *waitCount += 1;
- }
- {
- std::unique_lock<std::mutex> lock(*cvMutex);
- cv->wait(lock);
- }
- {
- std::lock_guard<std::mutex> lock(*waitCountMutex);
- *waitCount -= 1;
- }
- }
- }, &isAlive, &cv, &cvMutex, &fnQueue, &queueMutex, &waitCount,
- &waitCountMutex);
+ threads.emplace_back(Internal::thread_fn<SIZE>,
+ &isAlive,
+ &cv,
+ &cvMutex,
+ &fnQueue,
+ &queueMutex,
+ &waitCount);
+ threadsIDs.insert(threads.back().get_id());
}
}
}
for(auto &thread : threads) {
thread.join();
}
+ std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
}
If SIZE is 2 or greater, then this function will return immediately after
waking one or all threads, depending on the given boolean parameter.
*/
- void wakeThreads(bool wakeAll = true) {
+ void wakeThreads(const bool wakeAll = true) {
if(SIZE >= 2) {
// wake threads to pull functions from queue and run them
if(wakeAll) {
} else {
cv.notify_one();
}
+
+ // check if all threads are running a task, and spawn a new thread
+ // if this is the case
+ Internal::TPTupleType fnTuple;
+ bool hasFn = false;
+ if (waitCount.load(std::memory_order_relaxed) == 0) {
+ std::lock_guard<std::mutex> queueLock(queueMutex);
+ if (!fnQueue.empty()) {
+ fnTuple = fnQueue.front();
+ fnQueue.pop();
+ hasFn = true;
+ }
+ }
+
+ if (hasFn) {
+#ifndef NDEBUG
+ std::cout << "Spawning extra thread...\n";
+#endif
+ extraThreadCount.fetch_add(1);
+ std::thread newThread = std::thread(
+ [] (Internal::TPTupleType &&tuple, std::atomic_int *count) {
+ std::get<0>(tuple)(std::get<1>(tuple));
+#ifndef NDEBUG
+ std::cout << "Stopping extra thread...\n";
+#endif
+ count->fetch_sub(1);
+ },
+ std::move(fnTuple), &extraThreadCount);
+ newThread.detach();
+ }
} else {
sequentiallyRunTasks();
}
If SIZE is less than 2, then this will always return 0.
*/
int getWaitCount() {
- std::lock_guard<std::mutex> lock(waitCountMutex);
- return waitCount;
+ return waitCount.load(std::memory_order_relaxed);
}
/*!
*/
bool isAllThreadsWaiting() {
if(SIZE >= 2) {
- std::lock_guard<std::mutex> lock(waitCountMutex);
- return waitCount == SIZE;
+ return waitCount.load(std::memory_order_relaxed) == SIZE;
} else {
return true;
}
*/
void easyWakeAndWait() {
if(SIZE >= 2) {
- wakeThreads();
do {
+ wakeThreads();
std::this_thread::sleep_for(std::chrono::microseconds(150));
- } while(!isQueueEmpty() || !isAllThreadsWaiting());
+ } while(!isQueueEmpty()
+ || (threadsIDs.find(std::this_thread::get_id()) != threadsIDs.end()
+ && extraThreadCount.load(std::memory_order_relaxed) != 0));
+// } while(!isQueueEmpty() || !isAllThreadsWaiting());
} else {
sequentiallyRunTasks();
}
private:
std::vector<std::thread> threads;
+ std::unordered_set<std::thread::id> threadsIDs;
std::atomic_bool isAlive;
std::condition_variable cv;
std::mutex cvMutex;
Internal::TPQueueType fnQueue;
std::mutex queueMutex;
- int waitCount;
- std::mutex waitCountMutex;
+ std::atomic_int waitCount;
+ std::atomic_int extraThreadCount;
void sequentiallyRunTasks() {
// pull functions from queue and run them on current thread
#include <gtest/gtest.h>
+#include <chrono>
#include <iostream>
+#include <thread>
#include <tuple>
#include <memory>
#include <unordered_map>
}
}
}
+
+TEST(EC, NestedThreadPoolTasks) {
+ using ManagerType = EC::Manager<ListComponentsAll, ListTagsAll, 2>;
+ ManagerType manager;
+
+ std::array<std::size_t, 64> entities;
+ for (auto &entity : entities) {
+ entity = manager.addEntity();
+ manager.addComponent<C0>(entity, entity, entity);
+ }
+
+ manager.forMatchingSignature<EC::Meta::TypeList<C0>>([] (std::size_t id, void *data, C0 *c) {
+ ManagerType *manager = (ManagerType*)data;
+
+ manager->forMatchingSignature<EC::Meta::TypeList<C0>>([id] (std::size_t inner_id, void* data, C0 *inner_c) {
+ const C0 *const outer_c = (C0*)data;
+ EXPECT_EQ(id, outer_c->x);
+ EXPECT_EQ(inner_id, inner_c->x);
+ if (id == inner_id) {
+ EXPECT_EQ(outer_c->x, inner_c->x);
+ EXPECT_EQ(outer_c->y, inner_c->y);
+ } else {
+ EXPECT_NE(outer_c->x, inner_c->x);
+ EXPECT_NE(outer_c->y, inner_c->y);
+ }
+ }, c, false);
+ }, &manager, true);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+}