WIP nestable threads via ThreadPool

Currently has a race-condition-like memory corruption bug where function
contexts are deleted before child-functions complete (probably).
This commit is contained in:
Stephen Seo 2022-06-15 16:38:36 +09:00
parent 841a591aa4
commit 3286aa5a74
3 changed files with 149 additions and 71 deletions

View file

@ -114,7 +114,7 @@ namespace EC
std::array<std::size_t, 2> range; std::array<std::size_t, 2> range;
Manager *manager; Manager *manager;
EntitiesType *entities; EntitiesType *entities;
const BitsetType *signature; BitsetType signature;
void *userData; void *userData;
std::unordered_set<std::size_t> dead; std::unordered_set<std::size_t> dead;
}; };
@ -779,7 +779,7 @@ namespace EC
} }
else else
{ {
std::array<TPFnDataStructZero, ThreadCount * 2> fnDataAr; std::array<TPFnDataStructZero*, ThreadCount * 2> fnDataAr;
std::size_t s = currentSize / (ThreadCount * 2); std::size_t s = currentSize / (ThreadCount * 2);
for(std::size_t i = 0; i < ThreadCount * 2; ++i) { for(std::size_t i = 0; i < ThreadCount * 2; ++i) {
@ -793,14 +793,15 @@ namespace EC
if(begin == end) { if(begin == end) {
continue; continue;
} }
fnDataAr[i].range = {begin, end}; fnDataAr[i] = new TPFnDataStructZero{};
fnDataAr[i].manager = this; fnDataAr[i]->range = {begin, end};
fnDataAr[i].entities = &entities; fnDataAr[i]->manager = this;
fnDataAr[i].signature = &signatureBitset; fnDataAr[i]->entities = &entities;
fnDataAr[i].userData = userData; fnDataAr[i]->signature = signatureBitset;
fnDataAr[i]->userData = userData;
for(std::size_t j = begin; j < end; ++j) { for(std::size_t j = begin; j < end; ++j) {
if(!isAlive(j)) { if(!isAlive(j)) {
fnDataAr[i].dead.insert(j); fnDataAr[i]->dead.insert(j);
} }
} }
@ -812,17 +813,18 @@ namespace EC
continue; continue;
} }
if(((*data->signature) if(((data->signature)
& std::get<BitsetType>( & std::get<BitsetType>(
data->entities->at(i))) data->entities->at(i)))
== *data->signature) { == data->signature) {
Helper::call(i, Helper::call(i,
*data->manager, *data->manager,
std::forward<Function>(function), std::forward<Function>(function),
data->userData); data->userData);
} }
} }
}, &fnDataAr[i]); delete data;
}, fnDataAr[i]);
} }
threadPool->easyWakeAndWait(); threadPool->easyWakeAndWait();
} }
@ -1874,7 +1876,7 @@ namespace EC
may not have as great of a speed-up. may not have as great of a speed-up.
*/ */
template <typename Signature> template <typename Signature>
void forMatchingSimple(ForMatchingFn fn, void forMatchingSimple(ForMatchingFn fn,
void *userData = nullptr, void *userData = nullptr,
const bool useThreadPool = false) { const bool useThreadPool = false) {
deferringDeletions.fetch_add(1); deferringDeletions.fetch_add(1);
@ -1891,7 +1893,7 @@ namespace EC
} }
} }
} else { } else {
std::array<TPFnDataStructZero, ThreadCount * 2> fnDataAr; std::array<TPFnDataStructZero*, ThreadCount * 2> fnDataAr;
std::size_t s = currentSize / (ThreadCount * 2); std::size_t s = currentSize / (ThreadCount * 2);
for(std::size_t i = 0; i < ThreadCount * 2; ++i) { for(std::size_t i = 0; i < ThreadCount * 2; ++i) {
@ -1905,14 +1907,15 @@ namespace EC
if(begin == end) { if(begin == end) {
continue; continue;
} }
fnDataAr[i].range = {begin, end}; fnDataAr[i] = new TPFnDataStructZero{};
fnDataAr[i].manager = this; fnDataAr[i]->range = {begin, end};
fnDataAr[i].entities = &entities; fnDataAr[i]->manager = this;
fnDataAr[i].signature = &signatureBitset; fnDataAr[i]->entities = &entities;
fnDataAr[i].userData = userData; fnDataAr[i]->signature = signatureBitset;
fnDataAr[i]->userData = userData;
for(std::size_t j = begin; j < end; ++j) { for(std::size_t j = begin; j < end; ++j) {
if(!isAlive(j)) { if(!isAlive(j)) {
fnDataAr[i].dead.insert(j); fnDataAr[i]->dead.insert(j);
} }
} }
threadPool->queueFn([&fn] (void *ud) { threadPool->queueFn([&fn] (void *ud) {
@ -1921,14 +1924,15 @@ namespace EC
++i) { ++i) {
if(data->dead.find(i) != data->dead.end()) { if(data->dead.find(i) != data->dead.end()) {
continue; continue;
} else if((*data->signature } else if((data->signature
& std::get<BitsetType>( & std::get<BitsetType>(
data->entities->at(i))) data->entities->at(i)))
== *data->signature) { == data->signature) {
fn(i, data->manager, data->userData); fn(i, data->manager, data->userData);
} }
} }
}, &fnDataAr[i]); delete data;
}, fnDataAr[i]);
} }
threadPool->easyWakeAndWait(); threadPool->easyWakeAndWait();
} }

View file

@ -11,6 +11,11 @@
#include <functional> #include <functional>
#include <tuple> #include <tuple>
#include <chrono> #include <chrono>
#include <unordered_set>
#ifndef NDEBUG
# include <iostream>
#endif
namespace EC { namespace EC {
@ -18,6 +23,39 @@ namespace Internal {
using TPFnType = std::function<void(void*)>; using TPFnType = std::function<void(void*)>;
using TPTupleType = std::tuple<TPFnType, void*>; using TPTupleType = std::tuple<TPFnType, void*>;
using TPQueueType = std::queue<TPTupleType>; using TPQueueType = std::queue<TPTupleType>;
template <unsigned int SIZE>
void thread_fn(std::atomic_bool *isAlive,
std::condition_variable *cv,
std::mutex *cvMutex,
Internal::TPQueueType *fnQueue,
std::mutex *queueMutex,
std::atomic_int *waitCount) {
bool hasFn = false;
Internal::TPTupleType fnTuple;
while(isAlive->load()) {
hasFn = false;
{
std::lock_guard<std::mutex> lock(*queueMutex);
if(!fnQueue->empty()) {
fnTuple = fnQueue->front();
fnQueue->pop();
hasFn = true;
}
}
if(hasFn) {
std::get<0>(fnTuple)(std::get<1>(fnTuple));
continue;
}
waitCount->fetch_add(1);
{
std::unique_lock<std::mutex> lock(*cvMutex);
cv->wait(lock);
}
waitCount->fetch_sub(1);
}
}
} // namespace Internal } // namespace Internal
/*! /*!
@ -29,49 +67,20 @@ namespace Internal {
template <unsigned int SIZE> template <unsigned int SIZE>
class ThreadPool { class ThreadPool {
public: public:
ThreadPool() : waitCount(0) { ThreadPool() {
waitCount.store(0);
extraThreadCount.store(0);
isAlive.store(true); isAlive.store(true);
if(SIZE >= 2) { if(SIZE >= 2) {
for(unsigned int i = 0; i < SIZE; ++i) { for(unsigned int i = 0; i < SIZE; ++i) {
threads.emplace_back([] (std::atomic_bool *isAlive, threads.emplace_back(Internal::thread_fn<SIZE>,
std::condition_variable *cv, &isAlive,
std::mutex *cvMutex, &cv,
Internal::TPQueueType *fnQueue, &cvMutex,
std::mutex *queueMutex, &fnQueue,
int *waitCount, &queueMutex,
std::mutex *waitCountMutex) { &waitCount);
bool hasFn = false; threadsIDs.insert(threads.back().get_id());
Internal::TPTupleType fnTuple;
while(isAlive->load()) {
hasFn = false;
{
std::lock_guard<std::mutex> lock(*queueMutex);
if(!fnQueue->empty()) {
fnTuple = fnQueue->front();
fnQueue->pop();
hasFn = true;
}
}
if(hasFn) {
std::get<0>(fnTuple)(std::get<1>(fnTuple));
continue;
}
{
std::lock_guard<std::mutex> lock(*waitCountMutex);
*waitCount += 1;
}
{
std::unique_lock<std::mutex> lock(*cvMutex);
cv->wait(lock);
}
{
std::lock_guard<std::mutex> lock(*waitCountMutex);
*waitCount -= 1;
}
}
}, &isAlive, &cv, &cvMutex, &fnQueue, &queueMutex, &waitCount,
&waitCountMutex);
} }
} }
} }
@ -84,6 +93,7 @@ public:
for(auto &thread : threads) { for(auto &thread : threads) {
thread.join(); thread.join();
} }
std::this_thread::sleep_for(std::chrono::milliseconds(20));
} }
} }
@ -108,7 +118,7 @@ public:
If SIZE is 2 or greater, then this function will return immediately after If SIZE is 2 or greater, then this function will return immediately after
waking one or all threads, depending on the given boolean parameter. waking one or all threads, depending on the given boolean parameter.
*/ */
void wakeThreads(bool wakeAll = true) { void wakeThreads(const bool wakeAll = true) {
if(SIZE >= 2) { if(SIZE >= 2) {
// wake threads to pull functions from queue and run them // wake threads to pull functions from queue and run them
if(wakeAll) { if(wakeAll) {
@ -116,6 +126,36 @@ public:
} else { } else {
cv.notify_one(); cv.notify_one();
} }
// check if all threads are running a task, and spawn a new thread
// if this is the case
Internal::TPTupleType fnTuple;
bool hasFn = false;
if (waitCount.load(std::memory_order_relaxed) == 0) {
std::lock_guard<std::mutex> queueLock(queueMutex);
if (!fnQueue.empty()) {
fnTuple = fnQueue.front();
fnQueue.pop();
hasFn = true;
}
}
if (hasFn) {
#ifndef NDEBUG
std::cout << "Spawning extra thread...\n";
#endif
extraThreadCount.fetch_add(1);
std::thread newThread = std::thread(
[] (Internal::TPTupleType &&tuple, std::atomic_int *count) {
std::get<0>(tuple)(std::get<1>(tuple));
#ifndef NDEBUG
std::cout << "Stopping extra thread...\n";
#endif
count->fetch_sub(1);
},
std::move(fnTuple), &extraThreadCount);
newThread.detach();
}
} else { } else {
sequentiallyRunTasks(); sequentiallyRunTasks();
} }
@ -129,8 +169,7 @@ public:
If SIZE is less than 2, then this will always return 0. If SIZE is less than 2, then this will always return 0.
*/ */
int getWaitCount() { int getWaitCount() {
std::lock_guard<std::mutex> lock(waitCountMutex); return waitCount.load(std::memory_order_relaxed);
return waitCount;
} }
/*! /*!
@ -140,8 +179,7 @@ public:
*/ */
bool isAllThreadsWaiting() { bool isAllThreadsWaiting() {
if(SIZE >= 2) { if(SIZE >= 2) {
std::lock_guard<std::mutex> lock(waitCountMutex); return waitCount.load(std::memory_order_relaxed) == SIZE;
return waitCount == SIZE;
} else { } else {
return true; return true;
} }
@ -173,10 +211,13 @@ public:
*/ */
void easyWakeAndWait() { void easyWakeAndWait() {
if(SIZE >= 2) { if(SIZE >= 2) {
wakeThreads();
do { do {
wakeThreads();
std::this_thread::sleep_for(std::chrono::microseconds(150)); std::this_thread::sleep_for(std::chrono::microseconds(150));
} while(!isQueueEmpty() || !isAllThreadsWaiting()); } while(!isQueueEmpty()
|| (threadsIDs.find(std::this_thread::get_id()) != threadsIDs.end()
&& extraThreadCount.load(std::memory_order_relaxed) != 0));
// } while(!isQueueEmpty() || !isAllThreadsWaiting());
} else { } else {
sequentiallyRunTasks(); sequentiallyRunTasks();
} }
@ -184,13 +225,14 @@ public:
private: private:
std::vector<std::thread> threads; std::vector<std::thread> threads;
std::unordered_set<std::thread::id> threadsIDs;
std::atomic_bool isAlive; std::atomic_bool isAlive;
std::condition_variable cv; std::condition_variable cv;
std::mutex cvMutex; std::mutex cvMutex;
Internal::TPQueueType fnQueue; Internal::TPQueueType fnQueue;
std::mutex queueMutex; std::mutex queueMutex;
int waitCount; std::atomic_int waitCount;
std::mutex waitCountMutex; std::atomic_int extraThreadCount;
void sequentiallyRunTasks() { void sequentiallyRunTasks() {
// pull functions from queue and run them on current thread // pull functions from queue and run them on current thread

View file

@ -1,7 +1,9 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <chrono>
#include <iostream> #include <iostream>
#include <thread>
#include <tuple> #include <tuple>
#include <memory> #include <memory>
#include <unordered_map> #include <unordered_map>
@ -1431,3 +1433,33 @@ TEST(EC, ManagerDeferredDeletions) {
} }
} }
} }
TEST(EC, NestedThreadPoolTasks) {
using ManagerType = EC::Manager<ListComponentsAll, ListTagsAll, 2>;
ManagerType manager;
std::array<std::size_t, 64> entities;
for (auto &entity : entities) {
entity = manager.addEntity();
manager.addComponent<C0>(entity, entity, entity);
}
manager.forMatchingSignature<EC::Meta::TypeList<C0>>([] (std::size_t id, void *data, C0 *c) {
ManagerType *manager = (ManagerType*)data;
manager->forMatchingSignature<EC::Meta::TypeList<C0>>([id] (std::size_t inner_id, void* data, C0 *inner_c) {
const C0 *const outer_c = (C0*)data;
EXPECT_EQ(id, outer_c->x);
EXPECT_EQ(inner_id, inner_c->x);
if (id == inner_id) {
EXPECT_EQ(outer_c->x, inner_c->x);
EXPECT_EQ(outer_c->y, inner_c->y);
} else {
EXPECT_NE(outer_c->x, inner_c->x);
EXPECT_NE(outer_c->y, inner_c->y);
}
}, c, false);
}, &manager, true);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}