Merge branch 'threadpool'

This commit is contained in:
Stephen Seo 2021-09-07 12:07:05 +09:00
commit 99fef060d5
6 changed files with 794 additions and 432 deletions

2
.gitignore vendored
View file

@ -4,4 +4,4 @@ build*/
doxygen_html/ doxygen_html/
compile_commands.json compile_commands.json
tags tags
.clangd/ .cache/

View file

@ -14,7 +14,9 @@ set(EntityComponentSystem_HEADERS
EC/Meta/Meta.hpp EC/Meta/Meta.hpp
EC/Bitset.hpp EC/Bitset.hpp
EC/Manager.hpp EC/Manager.hpp
EC/EC.hpp) EC/EC.hpp
EC/ThreadPool.hpp
)
set(WillFailCompile_SOURCES set(WillFailCompile_SOURCES
test/WillFailCompileTest.cpp) test/WillFailCompileTest.cpp)
@ -48,7 +50,9 @@ if(GTEST_FOUND)
set(UnitTests_SOURCES set(UnitTests_SOURCES
test/MetaTest.cpp test/MetaTest.cpp
test/ECTest.cpp test/ECTest.cpp
test/Main.cpp) test/ThreadPoolTest.cpp
test/Main.cpp
)
add_executable(UnitTests ${UnitTests_SOURCES}) add_executable(UnitTests ${UnitTests_SOURCES})
target_link_libraries(UnitTests EntityComponentSystem ${GTEST_LIBRARIES}) target_link_libraries(UnitTests EntityComponentSystem ${GTEST_LIBRARIES})

File diff suppressed because it is too large Load diff

191
src/EC/ThreadPool.hpp Normal file
View file

@ -0,0 +1,191 @@
#ifndef EC_META_SYSTEM_THREADPOOL_HPP
#define EC_META_SYSTEM_THREADPOOL_HPP
#include <type_traits>
#include <vector>
#include <thread>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <tuple>
#include <chrono>
namespace EC {
namespace Internal {
using TPFnType = std::function<void(void*)>;
using TPTupleType = std::tuple<TPFnType, void*>;
using TPQueueType = std::queue<TPTupleType>;
} // namespace Internal
/*!
\brief Implementation of a Thread Pool.
Note that if SIZE is less than 2, then ThreadPool will not create threads and
run queued functions on the calling thread.
*/
template <unsigned int SIZE>
class ThreadPool {
public:
using THREADCOUNT = std::integral_constant<int, SIZE>;
ThreadPool() : waitCount(0) {
isAlive.store(true);
if(SIZE >= 2) {
for(unsigned int i = 0; i < SIZE; ++i) {
threads.emplace_back([] (std::atomic_bool *isAlive,
std::condition_variable *cv,
std::mutex *cvMutex,
Internal::TPQueueType *fnQueue,
std::mutex *queueMutex,
int *waitCount,
std::mutex *waitCountMutex) {
bool hasFn = false;
Internal::TPTupleType fnTuple;
while(isAlive->load()) {
hasFn = false;
{
std::lock_guard<std::mutex> lock(*queueMutex);
if(!fnQueue->empty()) {
fnTuple = fnQueue->front();
fnQueue->pop();
hasFn = true;
}
}
if(hasFn) {
std::get<0>(fnTuple)(std::get<1>(fnTuple));
continue;
}
{
std::lock_guard<std::mutex> lock(*waitCountMutex);
*waitCount += 1;
}
{
std::unique_lock<std::mutex> lock(*cvMutex);
cv->wait(lock);
}
{
std::lock_guard<std::mutex> lock(*waitCountMutex);
*waitCount -= 1;
}
}
}, &isAlive, &cv, &cvMutex, &fnQueue, &queueMutex, &waitCount,
&waitCountMutex);
}
}
}
~ThreadPool() {
if(SIZE >= 2) {
isAlive.store(false);
std::this_thread::sleep_for(std::chrono::milliseconds(20));
cv.notify_all();
for(auto &thread : threads) {
thread.join();
}
}
}
/*!
\brief Queues a function to be called (doesn't start calling yet).
To run the queued functions, wakeThreads() must be called to wake the
waiting threads which will start pulling functions from the queue to be
called.
*/
void queueFn(std::function<void(void*)>&& fn, void *ud = nullptr) {
std::lock_guard<std::mutex> lock(queueMutex);
fnQueue.emplace(std::make_tuple(fn, ud));
}
/*!
\brief Wakes waiting threads to start running queued functions.
If SIZE is less than 2, then this function call will block until all the
queued functions have been executed on the calling thread.
If SIZE is 2 or greater, then this function will return immediately after
waking one or all threads, depending on the given boolean parameter.
*/
void wakeThreads(bool wakeAll = true) {
if(SIZE >= 2) {
// wake threads to pull functions from queue and run them
if(wakeAll) {
cv.notify_all();
} else {
cv.notify_one();
}
} else {
// pull functions from queue and run them on main thread
Internal::TPTupleType fnTuple;
bool hasFn;
do {
{
std::lock_guard<std::mutex> lock(queueMutex);
if(!fnQueue.empty()) {
hasFn = true;
fnTuple = fnQueue.front();
fnQueue.pop();
} else {
hasFn = false;
}
}
if(hasFn) {
std::get<0>(fnTuple)(std::get<1>(fnTuple));
}
} while(hasFn);
}
}
/*!
\brief Gets the number of waiting threads.
If all threads are waiting, this should equal ThreadCount.
If SIZE is less than 2, then this will always return 0.
*/
int getWaitCount() {
std::lock_guard<std::mutex> lock(waitCountMutex);
return waitCount;
}
/*!
\brief Returns true if all threads are waiting.
If SIZE is less than 2, then this will always return true.
*/
bool isAllThreadsWaiting() {
if(SIZE >= 2) {
std::lock_guard<std::mutex> lock(waitCountMutex);
return waitCount == THREADCOUNT::value;
} else {
return true;
}
}
/*!
\brief Returns true if the function queue is empty.
*/
bool isQueueEmpty() {
std::lock_guard<std::mutex> lock(queueMutex);
return fnQueue.empty();
}
private:
std::vector<std::thread> threads;
std::atomic_bool isAlive;
std::condition_variable cv;
std::mutex cvMutex;
Internal::TPQueueType fnQueue;
std::mutex queueMutex;
int waitCount;
std::mutex waitCountMutex;
};
} // namespace EC
#endif

View file

@ -464,7 +464,7 @@ TEST(EC, MultiThreaded)
c->y = 2; c->y = 2;
}, },
nullptr, nullptr,
2 true
); );
for(unsigned int i = 0; i < 17; ++i) for(unsigned int i = 0; i < 17; ++i)
@ -490,7 +490,7 @@ TEST(EC, MultiThreaded)
c->y = 4; c->y = 4;
}, },
nullptr, nullptr,
8 true
); );
for(unsigned int i = 0; i < 3; ++i) for(unsigned int i = 0; i < 3; ++i)
@ -516,7 +516,7 @@ TEST(EC, MultiThreaded)
} }
); );
manager.callForMatchingFunctions(2); manager.callForMatchingFunctions(true);
for(unsigned int i = 0; i < 17; ++i) for(unsigned int i = 0; i < 17; ++i)
{ {
@ -531,7 +531,7 @@ TEST(EC, MultiThreaded)
} }
); );
manager.callForMatchingFunction(f1, 4); manager.callForMatchingFunction(f1, true);
for(unsigned int i = 0; i < 17; ++i) for(unsigned int i = 0; i < 17; ++i)
{ {
@ -544,7 +544,7 @@ TEST(EC, MultiThreaded)
manager.deleteEntity(i); manager.deleteEntity(i);
} }
manager.callForMatchingFunction(f0, 8); manager.callForMatchingFunction(f0, true);
for(unsigned int i = 0; i < 4; ++i) for(unsigned int i = 0; i < 4; ++i)
{ {
@ -710,7 +710,7 @@ TEST(EC, ForMatchingSignatures)
c0->y = 2; c0->y = 2;
}), }),
nullptr, nullptr,
3 true
); );
for(auto iter = cx.begin(); iter != cx.end(); ++iter) for(auto iter = cx.begin(); iter != cx.end(); ++iter)
@ -850,7 +850,7 @@ TEST(EC, forMatchingPtrs)
&func0 &func0
); );
manager.forMatchingSignaturePtr<TypeList<C0, T0> >( manager.forMatchingSignaturePtr<TypeList<C0, T0> >(
&func1 &func1, nullptr, true
); );
for(auto eid : e) for(auto eid : e)
@ -1098,7 +1098,7 @@ TEST(EC, forMatchingSimple) {
C0 *c0 = manager->getEntityData<C0>(id); C0 *c0 = manager->getEntityData<C0>(id);
c0->x += 10; c0->x += 10;
c0->y += 10; c0->y += 10;
}, nullptr, 3); }, nullptr, true);
// verify // verify
{ {
@ -1296,7 +1296,7 @@ TEST(EC, forMatchingIterableFn)
c->x += 100; c->x += 100;
c->y += 100; c->y += 100;
}; };
manager.forMatchingIterable(iterable, fn, nullptr, 3); manager.forMatchingIterable(iterable, fn, nullptr, true);
} }
{ {
@ -1322,7 +1322,7 @@ TEST(EC, forMatchingIterableFn)
c->x += 1000; c->x += 1000;
c->y += 1000; c->y += 1000;
}; };
manager.forMatchingIterable(iterable, fn, nullptr, 3); manager.forMatchingIterable(iterable, fn, nullptr, true);
} }
{ {
@ -1365,8 +1365,35 @@ TEST(EC, MultiThreadedForMatching) {
EXPECT_TRUE(manager.isAlive(first)); EXPECT_TRUE(manager.isAlive(first));
EXPECT_TRUE(manager.isAlive(second)); EXPECT_TRUE(manager.isAlive(second));
manager.callForMatchingFunction(fnIdx, 2); manager.callForMatchingFunction(fnIdx, true);
EXPECT_TRUE(manager.isAlive(first)); EXPECT_TRUE(manager.isAlive(first));
EXPECT_FALSE(manager.isAlive(second)); EXPECT_FALSE(manager.isAlive(second));
} }
TEST(EC, ManagerWithLowThreadCount) {
EC::Manager<ListComponentsAll, ListTagsAll, 1> manager;
std::array<std::size_t, 10> entities;
for(auto &id : entities) {
id = manager.addEntity();
manager.addComponent<C0>(id);
}
for(const auto &id : entities) {
auto *component = manager.getEntityComponent<C0>(id);
EXPECT_EQ(component->x, 0);
EXPECT_EQ(component->y, 0);
}
manager.forMatchingSignature<EC::Meta::TypeList<C0> >([] (std::size_t /*id*/, void* /*ud*/, C0 *c) {
c->x += 1;
c->y += 1;
}, nullptr, true);
for(const auto &id : entities) {
auto *component = manager.getEntityComponent<C0>(id);
EXPECT_EQ(component->x, 1);
EXPECT_EQ(component->y, 1);
}
}

View file

@ -0,0 +1,68 @@
#include <gtest/gtest.h>
#include <EC/ThreadPool.hpp>
using OneThreadPool = EC::ThreadPool<1>;
using ThreeThreadPool = EC::ThreadPool<3>;
TEST(ECThreadPool, CannotCompile) {
OneThreadPool p;
std::atomic_int data;
data.store(0);
const auto fn = [](void *ud) {
auto *data = static_cast<std::atomic_int*>(ud);
data->fetch_add(1);
};
p.queueFn(fn, &data);
p.wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} while(!p.isQueueEmpty() && !p.isAllThreadsWaiting());
ASSERT_EQ(data.load(), 1);
for(unsigned int i = 0; i < 10; ++i) {
p.queueFn(fn, &data);
}
p.wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} while(!p.isQueueEmpty() && !p.isAllThreadsWaiting());
ASSERT_EQ(data.load(), 11);
}
TEST(ECThreadPool, Simple) {
ThreeThreadPool p;
std::atomic_int data;
data.store(0);
const auto fn = [](void *ud) {
auto *data = static_cast<std::atomic_int*>(ud);
data->fetch_add(1);
};
p.queueFn(fn, &data);
p.wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} while(!p.isQueueEmpty() && !p.isAllThreadsWaiting());
ASSERT_EQ(data.load(), 1);
for(unsigned int i = 0; i < 10; ++i) {
p.queueFn(fn, &data);
}
p.wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} while(!p.isQueueEmpty() && !p.isAllThreadsWaiting());
ASSERT_EQ(data.load(), 11);
}