From 2e9e18a964e404b7052c588614d0f381743fc598 Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Mon, 6 Sep 2021 15:54:24 +0900 Subject: [PATCH 1/7] Impl ThreadPool --- .gitignore | 2 +- src/CMakeLists.txt | 8 ++- src/EC/ThreadPool.hpp | 123 ++++++++++++++++++++++++++++++++++++ src/test/ThreadPoolTest.cpp | 41 ++++++++++++ 4 files changed, 171 insertions(+), 3 deletions(-) create mode 100644 src/EC/ThreadPool.hpp create mode 100644 src/test/ThreadPoolTest.cpp diff --git a/.gitignore b/.gitignore index a087dc2..46ea9c4 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,4 @@ build*/ doxygen_html/ compile_commands.json tags -.clangd/ +.cache/ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0244e2b..186d0da 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -14,7 +14,9 @@ set(EntityComponentSystem_HEADERS EC/Meta/Meta.hpp EC/Bitset.hpp EC/Manager.hpp - EC/EC.hpp) + EC/EC.hpp + EC/ThreadPool.hpp +) set(WillFailCompile_SOURCES test/WillFailCompileTest.cpp) @@ -48,7 +50,9 @@ if(GTEST_FOUND) set(UnitTests_SOURCES test/MetaTest.cpp test/ECTest.cpp - test/Main.cpp) + test/ThreadPoolTest.cpp + test/Main.cpp + ) add_executable(UnitTests ${UnitTests_SOURCES}) target_link_libraries(UnitTests EntityComponentSystem ${GTEST_LIBRARIES}) diff --git a/src/EC/ThreadPool.hpp b/src/EC/ThreadPool.hpp new file mode 100644 index 0000000..30969e1 --- /dev/null +++ b/src/EC/ThreadPool.hpp @@ -0,0 +1,123 @@ +#ifndef EC_META_SYSTEM_THREADPOOL_HPP +#define EC_META_SYSTEM_THREADPOOL_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace EC { + +namespace Internal { + using TPFnType = std::function; + using TPTupleType = std::tuple; + using TPQueueType = std::queue; +} // namespace Internal + +template +class ThreadPool; + +template +class ThreadPool= 2)>::type> { +public: + using THREADCOUNT = std::integral_constant; + + ThreadPool() : waitCount(0) { + isAlive.store(true); + for(unsigned int i = 0; i < SIZE; ++i) { + threads.emplace_back([] (std::atomic_bool *isAlive, + std::condition_variable *cv, + std::mutex *cvMutex, + Internal::TPQueueType *fnQueue, + std::mutex *queueMutex, + int *waitCount, + std::mutex *waitCountMutex) { + bool hasFn = false; + Internal::TPTupleType fnTuple; + while(isAlive->load()) { + hasFn = false; + { + std::lock_guard lock(*queueMutex); + if(!fnQueue->empty()) { + fnTuple = fnQueue->front(); + fnQueue->pop(); + hasFn = true; + } + } + if(hasFn) { + std::get<0>(fnTuple)(std::get<1>(fnTuple)); + continue; + } + + { + std::lock_guard lock(*waitCountMutex); + *waitCount += 1; + } + { + std::unique_lock lock(*cvMutex); + cv->wait(lock); + } + { + std::lock_guard lock(*waitCountMutex); + *waitCount -= 1; + } + } + }, &isAlive, &cv, &cvMutex, &fnQueue, &queueMutex, &waitCount, &waitCountMutex); + } + } + + ~ThreadPool() { + isAlive.store(false); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + cv.notify_all(); + for(auto &thread : threads) { + thread.join(); + } + } + + void queueFn(std::function&& fn, void *ud = nullptr) { + std::lock_guard lock(queueMutex); + fnQueue.emplace(std::make_tuple(fn, ud)); + } + + void wakeThreads(bool wakeAll = true) { + unsigned int counter = 0; + if(wakeAll) { + cv.notify_all(); + } else { + cv.notify_one(); + } + while(isAllThreadsWaiting() && counter++ < 10000) {} + } + + int getWaitCount() { + std::lock_guard lock(waitCountMutex); + return waitCount; + } + + bool isAllThreadsWaiting() { + std::lock_guard lock(waitCountMutex); + return waitCount == THREADCOUNT::value; + } + +private: + std::vector threads; + std::atomic_bool isAlive; + std::condition_variable cv; + std::mutex cvMutex; + Internal::TPQueueType fnQueue; + std::mutex queueMutex; + int waitCount; + std::mutex waitCountMutex; + +}; + +} // namespace EC + +#endif diff --git a/src/test/ThreadPoolTest.cpp b/src/test/ThreadPoolTest.cpp new file mode 100644 index 0000000..ef7a695 --- /dev/null +++ b/src/test/ThreadPoolTest.cpp @@ -0,0 +1,41 @@ +#include + +#include + +//using OneThreadPool = EC::ThreadPool<1>; +using ThreeThreadPool = EC::ThreadPool<3>; + +//TEST(ECThreadPool, CannotCompile) { +// OneThreadPool tp; +//} + +TEST(ECThreadPool, Simple) { + ThreeThreadPool p{}; + std::atomic_int data; + data.store(0); + const auto fn = [](void *ud) { + auto *data = static_cast(ud); + data->fetch_add(1); + }; + + p.queueFn(fn, &data); + + p.wakeThreads(); + + while(!p.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_EQ(data.load(), 1); + + for(unsigned int i = 0; i < 10; ++i) { + p.queueFn(fn, &data); + } + p.wakeThreads(); + + while(!p.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_EQ(data.load(), 11); +} From f44d2f8c7b2cfb342a442ef9160a03356d6a2b4f Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Mon, 6 Sep 2021 19:52:23 +0900 Subject: [PATCH 2/7] WIP convert Manager to use ThreadPool valgrind seems to report memory issues, and documentation may need more updating. --- src/EC/Manager.hpp | 813 +++++++++++++++++++++++--------------------- src/test/ECTest.cpp | 22 +- 2 files changed, 436 insertions(+), 399 deletions(-) diff --git a/src/EC/Manager.hpp b/src/EC/Manager.hpp index 8e4b08c..225ceb8 100644 --- a/src/EC/Manager.hpp +++ b/src/EC/Manager.hpp @@ -36,6 +36,8 @@ #include "Meta/IndexOf.hpp" #include "Bitset.hpp" +#include "ThreadPool.hpp" + namespace EC { /*! @@ -46,12 +48,20 @@ namespace EC Note that all components must have a default constructor. + An optional third template parameter may be given, which is the size of + the number of threads in the internal ThreadPool, and it must be at + least 2. If ThreadCount is 1 or less, the program will fail to compile. + Note that using the internal ThreadPool is optional; several member + functions of Manager accept a boolean indicating if the internal + ThreadPool is to be used. Always passing false for that value will + result in never using the ThreadPool. + Example: \code{.cpp} EC::Manager, TypeList> manager; \endcode */ - template + template struct Manager { public: @@ -83,6 +93,8 @@ namespace EC std::size_t currentSize = 0; std::unordered_set deletedSet; + ThreadPool threadPool; + public: /*! \brief Initializes the manager with a default capacity. @@ -516,11 +528,11 @@ namespace EC const std::size_t& entityID, CType& ctype, Function&& function, - void* context = nullptr) + void* userData = nullptr) { function( entityID, - context, + userData, ctype.template getEntityData(entityID)... ); } @@ -530,11 +542,11 @@ namespace EC const std::size_t& entityID, CType& ctype, Function* function, - void* context = nullptr) + void* userData= nullptr) { (*function)( entityID, - context, + userData, ctype.template getEntityData(entityID)... ); } @@ -544,13 +556,13 @@ namespace EC const std::size_t& entityID, CType& ctype, Function&& function, - void* context = nullptr) const + void* userData = nullptr) const { ForMatchingSignatureHelper::call( entityID, ctype, std::forward(function), - context); + userData); } template @@ -558,13 +570,13 @@ namespace EC const std::size_t& entityID, CType& ctype, Function* function, - void* context = nullptr) const + void* userData = nullptr) const { ForMatchingSignatureHelper::callPtr( entityID, ctype, function, - context); + userData); } }; @@ -581,14 +593,15 @@ namespace EC The second parameter is default nullptr and will be passed to the function call as the second parameter as a means of providing - context (useful when the function is not a lambda function). The - third parameter is default 1 (not multi-threaded). If the third - parameter threadCount is set to a value greater than 1, then - threadCount threads will be used. Note that multi-threading is - based on splitting the task of calling the function across sections - of entities. Thus if there are only a small amount of entities in - the manager, then using multiple threads may not have as great of a - speed-up. + context (useful when the function is not a lambda function). + + The third parameter is default false (not multi-threaded). + Otherwise, if true, then the thread pool will be used to call the + given function in parallel across all entities. Note that + multi-threading is based on splitting the task of calling the + function across sections of entities. Thus if there are only a small + amount of entities in the manager, then using multiple threads may + not have as great of a speed-up. Example: \code{.cpp} @@ -609,8 +622,8 @@ namespace EC */ template void forMatchingSignature(Function&& function, - void* context = nullptr, - std::size_t threadCount = 1) + void* userData = nullptr, + const bool useThreadPool = false) { using SignatureComponents = typename EC::Meta::Matching::type; @@ -621,7 +634,7 @@ namespace EC BitsetType signatureBitset = BitsetType::template generateBitset(); - if(threadCount <= 1) + if(!useThreadPool) { for(std::size_t i = 0; i < currentSize; ++i) { @@ -634,52 +647,53 @@ namespace EC == signatureBitset) { Helper::call(i, *this, - std::forward(function), context); + std::forward(function), userData); } } } else { - std::vector threads(threadCount); - std::size_t s = currentSize / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) - { + using TPFnDataType = std::tuple, void*>; + std::array fnDataAr; + + std::size_t s = currentSize / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) - { + if(i == ThreadCount - 1) { end = currentSize; - } - else - { + } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &function, &signatureBitset, &context] - (std::size_t begin, - std::size_t end) { - for(std::size_t i = begin; i < end; ++i) - { - if(!std::get(this->entities[i])) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = &entities; + std::get<2>(fnDataAr.at(i)) = &signatureBitset; + std::get<3>(fnDataAr.at(i)) = {begin, end}; + std::get<4>(fnDataAr.at(i)) = userData; + threadPool.queueFn([&function] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<3>(*data).at(0); + i < std::get<3>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { continue; } - if((signatureBitset - & std::get(entities[i])) - == signatureBitset) - { - Helper::call(i, *this, - std::forward(function), context); + if((*std::get<2>(*data) + & std::get( + std::get<1>(*data)->at(i))) + == *std::get<2>(*data)) { + Helper::call(i, *std::get<0>(*data), std::forward(function), std::get<4>(*data)); } } - }, - begin, - end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } } @@ -696,14 +710,15 @@ namespace EC The second parameter is default nullptr and will be passed to the function call as the second parameter as a means of providing - context (useful when the function is not a lambda function). The - third parameter is default 1 (not multi-threaded). If the third - parameter threadCount is set to a value greater than 1, then - threadCount threads will be used. Note that multi-threading is based - on splitting the task of calling the function across sections of - entities. Thus if there are only a small amount of entities in the - manager, then using multiple threads may not have as great of a - speed-up. + context (useful when the function is not a lambda function). + + The third parameter is default false (not multi-threaded). + Otherwise, if true, then the thread pool will be used to call the + given function in parallel across all entities. Note that + multi-threading is based on splitting the task of calling the + function across sections of entities. Thus if there are only a small + amount of entities in the manager, then using multiple threads may + not have as great of a speed-up. Example: \code{.cpp} @@ -726,8 +741,8 @@ namespace EC */ template void forMatchingSignaturePtr(Function* function, - void* context = nullptr, - std::size_t threadCount = 1) + void* userData = nullptr, + const bool useThreadPool = false) { using SignatureComponents = typename EC::Meta::Matching::type; @@ -738,7 +753,7 @@ namespace EC BitsetType signatureBitset = BitsetType::template generateBitset(); - if(threadCount <= 1) + if(!useThreadPool) { for(std::size_t i = 0; i < currentSize; ++i) { @@ -750,51 +765,54 @@ namespace EC if((signatureBitset & std::get(entities[i])) == signatureBitset) { - Helper::callPtr(i, *this, function, context); + Helper::callPtr(i, *this, function, userData); } } } else { - std::vector threads(threadCount); - std::size_t s = currentSize / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) - { + using TPFnDataType = std::tuple, void*, Function*>; + std::array fnDataAr; + + std::size_t s = currentSize / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) - { + if(i == ThreadCount - 1) { end = currentSize; - } - else - { + } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &function, &signatureBitset, &context] - (std::size_t begin, - std::size_t end) { - for(std::size_t i = begin; i < end; ++i) - { - if(!std::get(this->entities[i])) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = &entities; + std::get<2>(fnDataAr.at(i)) = &signatureBitset; + std::get<3>(fnDataAr.at(i)) = {begin, end}; + std::get<4>(fnDataAr.at(i)) = userData; + std::get<5>(fnDataAr.at(i)) = function; + threadPool.queueFn([] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<3>(*data).at(0); + i < std::get<3>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { continue; } - if((signatureBitset - & std::get(entities[i])) - == signatureBitset) - { - Helper::callPtr(i, *this, function, context); + if((*std::get<2>(*data) + & std::get( + std::get<1>(*data)->at(i))) + == *std::get<2>(*data)) { + Helper::callPtr(i, *std::get<0>(*data), std::get<5>(*data), std::get<4>(*data)); } } - }, - begin, - end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } } @@ -859,7 +877,7 @@ namespace EC template std::size_t addForMatchingFunction( Function&& function, - void* context = nullptr) + void* userData = nullptr) { while(forMatchingFunctions.find(functionIndex) != forMatchingFunctions.end()) @@ -882,54 +900,63 @@ namespace EC functionIndex, std::make_tuple( signatureBitset, - context, + userData, [function, helper, this] - (std::size_t threadCount, + (const bool useThreadPool, std::vector matching, - void* context) + void* userData) { - if(threadCount <= 1 || matching.size() < threadCount) + if(!useThreadPool) { for(auto eid : matching) { if(isAlive(eid)) { helper.callInstancePtr( - eid, *this, &function, context); + eid, *this, &function, userData); } } } else { - std::vector threads(threadCount); - std::size_t s = matching.size() / threadCount; - for(std::size_t i = 0; i < threadCount; ++ i) - { + using TPFnDataType = std::tuple, void*, const std::vector*>; + std::array fnDataAr; + + std::size_t s = matching.size() / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) { + if(i == ThreadCount - 1) { end = matching.size(); } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &function, &helper, &context, &matching] - (std::size_t begin, - std::size_t end) { - for(std::size_t j = begin; j < end; ++j) - { - if(isAlive(matching[j])) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = &entities; + std::get<2>(fnDataAr.at(i)) = {begin, end}; + std::get<3>(fnDataAr.at(i)) = userData; + std::get<4>(fnDataAr.at(i)) = &matching; + threadPool.queueFn([function, helper] (void* ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<2>(*data).at(0); + i < std::get<2>(*data).at(1); + ++i) { + if(std::get<0>(*data)->isAlive(std::get<4>(*data)->at(i))) { helper.callInstancePtr( - matching[j], *this, &function, context); + std::get<4>(*data)->at(i), + *std::get<0>(*data), + &function, + std::get<3>(*data)); } } - }, - begin, end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } }))); @@ -939,11 +966,11 @@ namespace EC private: std::vector > getMatchingEntities( - std::vector bitsets, std::size_t threadCount = 1) + std::vector bitsets, const bool useThreadPool = false) { std::vector > matchingV(bitsets.size()); - if(threadCount <= 1 || currentSize <= threadCount) + if(!useThreadPool) { for(std::size_t i = 0; i < currentSize; ++i) { @@ -963,63 +990,52 @@ namespace EC } else { - std::vector threads(threadCount); - std::size_t s = currentSize / threadCount; - std::mutex mutex; + using TPFnDataType = std::tuple, std::vector >*, const std::vector*, EntitiesType*, std::mutex*>; + std::array fnDataAr; - if(s == 0) { - for(std::size_t i = 0; i < currentSize; ++i) { - threads[i] = std::thread( - [this, &matchingV, &bitsets, &mutex] (std::size_t idx) { - if(!isAlive(idx)) { - return; + std::size_t s = currentSize / ThreadCount; + std::mutex mutex; + for(std::size_t i = 0; i < ThreadCount; ++i) { + std::size_t begin = s * i; + std::size_t end; + if(i == ThreadCount - 1) { + end = currentSize; + } else { + end = s * (i + 1); + } + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = {begin, end}; + std::get<2>(fnDataAr.at(i)) = &matchingV; + std::get<3>(fnDataAr.at(i)) = &bitsets; + std::get<4>(fnDataAr.at(i)) = &entities; + std::get<5>(fnDataAr.at(i)) = &mutex; + threadPool.queueFn([] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<1>(*data).at(0); + i < std::get<1>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { + continue; } - for(std::size_t k = 0; k < bitsets.size(); ++k) - { - if(((*bitsets[k]) & - std::get(entities[idx])) - == (*bitsets[k])) - { - std::lock_guard guard(mutex); - matchingV[k].push_back(idx); + for(std::size_t j = 0; + j < std::get<3>(*data)->size(); + ++j) { + if(((*std::get<3>(*data)->at(j)) + & std::get(std::get<4>(*data)->at(i))) + == (*std::get<3>(*data)->at(j))) { + std::lock_guard lock(*std::get<5>(*data)); + std::get<2>(*data)->at(j).push_back(i); } } - }, i); - } - for(std::size_t i = 0; i < currentSize; ++i) { - threads[i].join(); - } - } else { - for (std::size_t i = 0; i < threadCount; ++i) { - std::size_t begin = s * i; - std::size_t end; - if (i == threadCount - 1) { - end = currentSize; - } else { - end = s * (i + 1); } - - threads[i] = std::thread( - [this, &matchingV, &bitsets, &mutex] - (std::size_t begin, std::size_t end) { - for (std::size_t j = begin; j < end; ++j) { - if (!isAlive(j)) { - continue; - } - for (std::size_t k = 0; k < bitsets.size(); ++k) { - if (((*bitsets[k]) & - std::get(entities[j])) - == (*bitsets[k])) { - std::lock_guard guard(mutex); - matchingV[k].push_back(j); - } - } - } - }, begin, end); - } - for (std::size_t i = 0; i < threadCount; ++i) { - threads[i].join(); - } + }, &fnDataAr.at(i)); + } + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } @@ -1031,12 +1047,13 @@ namespace EC /*! \brief Call all stored functions. - The first (and only) parameter can be optionally used to specify the - number of threads to use when calling the functions. Otherwise, this - function is by default not multi-threaded. - Note that multi-threading is based on splitting the task of calling - the functions across sections of entities. Thus if there are only - a small amount of entities in the manager, then using multiple + The first (and only) parameter can be optionally used to enable the + use of the internal ThreadPool to call all stored functions in + parallel. Using the value false (which is the default) will not use + the ThreadPool and run all stored functions sequentially on the main + thread. Note that multi-threading is based on splitting the task of + calling the functions across sections of entities. Thus if there are + only a small amount of entities in the manager, then using multiple threads may not have as great of a speed-up. Example: @@ -1058,7 +1075,7 @@ namespace EC manager.clearForMatchingFunctions(); \endcode */ - void callForMatchingFunctions(std::size_t threadCount = 1) + void callForMatchingFunctions(const bool useThreadPool = false) { std::vector bitsets; for(auto iter = forMatchingFunctions.begin(); @@ -1069,7 +1086,7 @@ namespace EC } std::vector > matching = - getMatchingEntities(bitsets, threadCount); + getMatchingEntities(bitsets, useThreadPool); std::size_t i = 0; for(auto iter = forMatchingFunctions.begin(); @@ -1077,20 +1094,21 @@ namespace EC ++iter) { std::get<2>(iter->second)( - threadCount, matching[i++], std::get<1>(iter->second)); + useThreadPool, matching[i++], std::get<1>(iter->second)); } } /*! \brief Call a specific stored function. - A second parameter can be optionally used to specify the number - of threads to use when calling the function. Otherwise, this - function is by default not multi-threaded. - Note that multi-threading is based on splitting the task of calling - the function across sections of entities. Thus if there are only - a small amount of entities in the manager, then using multiple - threads may not have as great of a speed-up. + The second parameter can be optionally used to enable the use of the + internal ThreadPool to call the stored function in parallel. Using + the value false (which is the default) will not use the ThreadPool + and run the stored function sequentially on the main thread. Note + that multi-threading is based on splitting the task of calling the + functions across sections of entities. Thus if there are only a + small amount of entities in the manager, then using multiple threads + may not have as great of a speed-up. Example: \code{.cpp} @@ -1110,7 +1128,7 @@ namespace EC \return False if a function with the given id does not exist. */ bool callForMatchingFunction(std::size_t id, - std::size_t threadCount = 1) + const bool useThreadPool = false) { auto iter = forMatchingFunctions.find(id); if(iter == forMatchingFunctions.end()) @@ -1119,9 +1137,9 @@ namespace EC } std::vector > matching = getMatchingEntities(std::vector{ - &std::get(iter->second)}, threadCount); + &std::get(iter->second)}, useThreadPool); std::get<2>(iter->second)( - threadCount, matching[0], std::get<1>(iter->second)); + useThreadPool, matching[0], std::get<1>(iter->second)); return true; } @@ -1263,12 +1281,12 @@ namespace EC \return True if id is valid and context was updated */ - bool changeForMatchingFunctionContext(std::size_t id, void* context) + bool changeForMatchingFunctionContext(std::size_t id, void* userData) { auto f = forMatchingFunctions.find(id); if(f != forMatchingFunctions.end()) { - std::get<1>(f->second) = context; + std::get<1>(f->second) = userData; return true; } return false; @@ -1317,11 +1335,11 @@ namespace EC template void forMatchingSignatures( FTuple fTuple, - void* context = nullptr, - const std::size_t threadCount = 1) + void* userData = nullptr, + const bool useThreadPool = false) { - std::vector > multiMatchingEntities( - SigList::size); + std::vector > + multiMatchingEntities(SigList::size); BitsetType signatureBitsets[SigList::size]; // generate bitsets for each signature @@ -1333,7 +1351,7 @@ namespace EC }); // find and store entities matching signatures - if(threadCount <= 1) + if(!useThreadPool) { for(std::size_t eid = 0; eid < currentSize; ++eid) { @@ -1354,48 +1372,49 @@ namespace EC } else { - std::vector threads(threadCount); - std::mutex mutexes[SigList::size]; - std::size_t s = currentSize / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) - { + using TPFnDataType = std::tuple, std::vector >*, BitsetType*, std::mutex*>; + std::array fnDataAr; + + std::mutex mutex; + std::size_t s = currentSize / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) - { + if(i == ThreadCount - 1) { end = currentSize; - } - else - { + } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &mutexes, &multiMatchingEntities, &signatureBitsets] - (std::size_t begin, std::size_t end) - { - for(std::size_t j = begin; j < end; ++j) - { - if(!isAlive(j)) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = {begin, end}; + std::get<2>(fnDataAr.at(i)) = &multiMatchingEntities; + std::get<3>(fnDataAr.at(i)) = signatureBitsets; + std::get<4>(fnDataAr.at(i)) = &mutex; + + threadPool.queueFn([] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<1>(*data).at(0); + i < std::get<1>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { continue; } - for(std::size_t k = 0; k < SigList::size; ++k) - { - if((signatureBitsets[k] - & std::get(entities[j])) - == signatureBitsets[k]) - { - std::lock_guard guard( - mutexes[k]); - multiMatchingEntities[k].push_back(j); + for(std::size_t j = 0; j < SigList::size; ++j) { + if((std::get<3>(*data)[j] & std::get(std::get<0>(*data)->entities[i])) + == std::get<3>(*data)[j]) { + std::lock_guard lock(*std::get<4>(*data)); + std::get<2>(*data)->at(j).push_back(i); } } } - }, begin, end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } @@ -1403,7 +1422,7 @@ namespace EC EC::Meta::forEachDoubleTuple( EC::Meta::Morph >{}, fTuple, - [this, &multiMatchingEntities, &threadCount, &context] + [this, &multiMatchingEntities, useThreadPool, &userData] (auto sig, auto func, auto index) { using SignatureComponents = @@ -1413,55 +1432,51 @@ namespace EC EC::Meta::Morph< SignatureComponents, ForMatchingSignatureHelper<> >; - if(threadCount <= 1) - { - for(const auto& id : multiMatchingEntities[index]) - { - if(isAlive(id)) - { - Helper::call(id, *this, func, context); + if(!useThreadPool) { + for(const auto& id : multiMatchingEntities[index]) { + if(isAlive(id)) { + Helper::call(id, *this, func, userData); } } - } - else - { - std::vector threads(threadCount); + } else { + using TPFnType = std::tuple, std::vector > *, std::size_t>; + std::array fnDataAr; std::size_t s = multiMatchingEntities[index].size() - / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) - { + / ThreadCount; + for(unsigned int i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) - { + if(i == ThreadCount - 1) { end = multiMatchingEntities[index].size(); - } - else - { + } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &multiMatchingEntities, &index, &func, - &context] - (std::size_t begin, std::size_t end) - { - for(std::size_t j = begin; j < end; - ++j) - { - if(isAlive(multiMatchingEntities[index][j])) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = userData; + std::get<2>(fnDataAr.at(i)) = {begin, end}; + std::get<3>(fnDataAr.at(i)) = &multiMatchingEntities; + std::get<4>(fnDataAr.at(i)) = index; + threadPool.queueFn([&func] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<2>(*data).at(0); + i < std::get<2>(*data).at(1); + ++i) { + if(std::get<0>(*data)->isAlive(std::get<3>(*data)->at(std::get<4>(*data)).at(i))) { Helper::call( - multiMatchingEntities[index][j], - *this, + std::get<3>(*data)->at(std::get<4>(*data)).at(i), + *std::get<0>(*data), func, - context); + std::get<1>(*data)); } } - }, begin, end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } } @@ -1513,8 +1528,8 @@ namespace EC */ template void forMatchingSignaturesPtr(FTuple fTuple, - void* context = nullptr, - std::size_t threadCount = 1) + void* userData = nullptr, + const bool useThreadPool = false) { std::vector > multiMatchingEntities( SigList::size); @@ -1529,7 +1544,7 @@ namespace EC }); // find and store entities matching signatures - if(threadCount <= 1) + if(!useThreadPool) { for(std::size_t eid = 0; eid < currentSize; ++eid) { @@ -1550,48 +1565,49 @@ namespace EC } else { - std::vector threads(threadCount); - std::mutex mutexes[SigList::size]; - std::size_t s = currentSize / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) - { + using TPFnDataType = std::tuple, std::vector >*, BitsetType*, std::mutex*>; + std::array fnDataAr; + + std::mutex mutex; + std::size_t s = currentSize / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) - { + if(i == ThreadCount - 1) { end = currentSize; - } - else - { + } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &mutexes, &multiMatchingEntities, &signatureBitsets] - (std::size_t begin, std::size_t end) - { - for(std::size_t j = begin; j < end; ++j) - { - if(!isAlive(j)) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = {begin, end}; + std::get<2>(fnDataAr.at(i)) = &multiMatchingEntities; + std::get<3>(fnDataAr.at(i)) = signatureBitsets; + std::get<4>(fnDataAr.at(i)) = &mutex; + + threadPool.queueFn([] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<1>(*data).at(0); + i < std::get<1>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { continue; } - for(std::size_t k = 0; k < SigList::size; ++k) - { - if((signatureBitsets[k] - & std::get(entities[j])) - == signatureBitsets[k]) - { - std::lock_guard guard( - mutexes[k]); - multiMatchingEntities[k].push_back(j); + for(std::size_t j = 0; j < SigList::size; ++j) { + if((std::get<3>(*data)[j] & std::get(std::get<0>(*data)->entities[i])) + == std::get<3>(*data)[j]) { + std::lock_guard lock(*std::get<4>(*data)); + std::get<2>(*data)->at(j).push_back(i); } } } - }, begin, end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } @@ -1599,7 +1615,7 @@ namespace EC EC::Meta::forEachDoubleTuple( EC::Meta::Morph >{}, fTuple, - [this, &multiMatchingEntities, &threadCount, &context] + [this, &multiMatchingEntities, useThreadPool, &userData] (auto sig, auto func, auto index) { using SignatureComponents = @@ -1609,55 +1625,56 @@ namespace EC EC::Meta::Morph< SignatureComponents, ForMatchingSignatureHelper<> >; - if(threadCount <= 1) + if(!useThreadPool) { for(const auto& id : multiMatchingEntities[index]) { if(isAlive(id)) { - Helper::callPtr(id, *this, func, context); + Helper::callPtr(id, *this, func, userData); } } } else { - std::vector threads(threadCount); + using TPFnType = std::tuple, std::vector > *, std::size_t>; + std::array fnDataAr; std::size_t s = multiMatchingEntities[index].size() - / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) - { + / ThreadCount; + for(unsigned int i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) - { + if(i == ThreadCount - 1) { end = multiMatchingEntities[index].size(); - } - else - { + } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &multiMatchingEntities, &index, &func, - &context] - (std::size_t begin, std::size_t end) - { - for(std::size_t j = begin; j < end; - ++j) - { - if(isAlive(multiMatchingEntities[index][j])) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = userData; + std::get<2>(fnDataAr.at(i)) = {begin, end}; + std::get<3>(fnDataAr.at(i)) = &multiMatchingEntities; + std::get<4>(fnDataAr.at(i)) = index; + threadPool.queueFn([&func] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<2>(*data).at(0); + i < std::get<2>(*data).at(1); + ++i) { + if(std::get<0>(*data)->isAlive(std::get<3>(*data)->at(std::get<4>(*data)).at(i))) { Helper::callPtr( - multiMatchingEntities[index][j], - *this, + std::get<3>(*data)->at(std::get<4>(*data)).at(i), + *std::get<0>(*data), func, - context); + std::get<1>(*data)); } } - }, begin, end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } } @@ -1675,9 +1692,9 @@ namespace EC * query component/tag data. */ template - void forMatchingSimple(ForMatchingFn fn, void *userData = nullptr, std::size_t threadCount = 1) { + void forMatchingSimple(ForMatchingFn fn, void *userData = nullptr, const bool useThreadPool = false) { const BitsetType signatureBitset = BitsetType::template generateBitset(); - if(threadCount <= 1) { + if(!useThreadPool) { for(std::size_t i = 0; i < currentSize; ++i) { if(!std::get(entities[i])) { continue; @@ -1686,36 +1703,42 @@ namespace EC } } } else { - std::vector threads(threadCount); - const std::size_t s = currentSize / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) { - const std::size_t begin = s * i; - const std::size_t end = - i == threadCount - 1 ? - currentSize : - s * (i + 1); - threads[i] = std::thread( - [this] (const std::size_t begin, - const std::size_t end, - const BitsetType signatureBitset, - ForMatchingFn fn, - void *userData) { - for(std::size_t i = begin; i < end; ++i) { - if(!std::get(entities[i])) { - continue; - } else if((signatureBitset & std::get(entities[i])) == signatureBitset) { - fn(i, this, userData); - } + using TPFnDataType = std::tuple, void*>; + std::array fnDataAr; + + std::size_t s = currentSize / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { + std::size_t begin = s * i; + std::size_t end; + if(i == ThreadCount - 1) { + end = currentSize; + } else { + end = s * (i + 1); + } + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = &entities; + std::get<2>(fnDataAr.at(i)) = &signatureBitset; + std::get<3>(fnDataAr.at(i)) = {begin, end}; + std::get<4>(fnDataAr.at(i)) = userData; + threadPool.queueFn([&fn] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<3>(*data).at(0); + i < std::get<3>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { + continue; + } else if((*std::get<2>(*data) & std::get(std::get<1>(*data)->at(i))) == *std::get<2>(*data)) { + fn(i, std::get<0>(*data), std::get<4>(*data)); } - }, - begin, - end, - signatureBitset, - fn, - userData); + } + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } } @@ -1730,8 +1753,8 @@ namespace EC * defined typedef of type ForMatchingFn. */ template - void forMatchingIterable(Iterable iterable, ForMatchingFn fn, void* userPtr = nullptr, std::size_t threadCount = 1) { - if(threadCount <= 1) { + void forMatchingIterable(Iterable iterable, ForMatchingFn fn, void* userData = nullptr, const bool useThreadPool = false) { + if(!useThreadPool) { bool isValid; for(std::size_t i = 0; i < currentSize; ++i) { if(!std::get(entities[i])) { @@ -1747,41 +1770,55 @@ namespace EC } if(!isValid) { continue; } - fn(i, this, userPtr); + fn(i, this, userData); } } else { - std::vector threads(threadCount); - std::size_t s = currentSize / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) { + using TPFnDataType = std::tuple, void*>; + std::array fnDataAr; + + std::size_t s = currentSize / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; - std::size_t end = - i == threadCount - 1 ? - currentSize : - s * (i + 1); - threads[i] = std::thread( - [this, &fn, &iterable, userPtr] (std::size_t begin, std::size_t end) { - bool isValid; - for(std::size_t i = begin; i < end; ++i) { - if(!std::get(this->entities[i])) { - continue; - } - - isValid = true; - for(const auto& integralValue : iterable) { - if(!std::get(entities[i]).getCombinedBit(integralValue)) { - isValid = false; - break; - } - } - if(!isValid) { continue; } - - fn(i, this, userPtr); + std::size_t end; + if(i == ThreadCount - 1) { + end = currentSize; + } else { + end = s * (i + 1); + } + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = &entities; + std::get<2>(fnDataAr.at(i)) = &iterable; + std::get<3>(fnDataAr.at(i)) = {begin, end}; + std::get<4>(fnDataAr.at(i)) = userData; + threadPool.queueFn([&fn] (void *ud) { + auto *data = static_cast(ud); + bool isValid; + for(std::size_t i = std::get<3>(*data).at(0); + i < std::get<3>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { + continue; } - }, - begin, end); + isValid = true; + for(const auto& integralValue : *std::get<2>(*data)) { + if(!std::get(std::get<1>(*data)->at(i)).getCombinedBit(integralValue)) { + isValid = false; + break; + } + } + if(!isValid) { continue; } + + fn(i, std::get<0>(*data), std::get<4>(*data)); + + } + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } } diff --git a/src/test/ECTest.cpp b/src/test/ECTest.cpp index 24b8b11..3105903 100644 --- a/src/test/ECTest.cpp +++ b/src/test/ECTest.cpp @@ -464,7 +464,7 @@ TEST(EC, MultiThreaded) c->y = 2; }, nullptr, - 2 + true ); for(unsigned int i = 0; i < 17; ++i) @@ -490,7 +490,7 @@ TEST(EC, MultiThreaded) c->y = 4; }, nullptr, - 8 + true ); for(unsigned int i = 0; i < 3; ++i) @@ -516,7 +516,7 @@ TEST(EC, MultiThreaded) } ); - manager.callForMatchingFunctions(2); + manager.callForMatchingFunctions(true); for(unsigned int i = 0; i < 17; ++i) { @@ -531,7 +531,7 @@ TEST(EC, MultiThreaded) } ); - manager.callForMatchingFunction(f1, 4); + manager.callForMatchingFunction(f1, true); for(unsigned int i = 0; i < 17; ++i) { @@ -544,7 +544,7 @@ TEST(EC, MultiThreaded) manager.deleteEntity(i); } - manager.callForMatchingFunction(f0, 8); + manager.callForMatchingFunction(f0, true); for(unsigned int i = 0; i < 4; ++i) { @@ -710,7 +710,7 @@ TEST(EC, ForMatchingSignatures) c0->y = 2; }), nullptr, - 3 + true ); for(auto iter = cx.begin(); iter != cx.end(); ++iter) @@ -850,7 +850,7 @@ TEST(EC, forMatchingPtrs) &func0 ); manager.forMatchingSignaturePtr >( - &func1 + &func1, nullptr, true ); for(auto eid : e) @@ -1098,7 +1098,7 @@ TEST(EC, forMatchingSimple) { C0 *c0 = manager->getEntityData(id); c0->x += 10; c0->y += 10; - }, nullptr, 3); + }, nullptr, true); // verify { @@ -1296,7 +1296,7 @@ TEST(EC, forMatchingIterableFn) c->x += 100; c->y += 100; }; - manager.forMatchingIterable(iterable, fn, nullptr, 3); + manager.forMatchingIterable(iterable, fn, nullptr, true); } { @@ -1322,7 +1322,7 @@ TEST(EC, forMatchingIterableFn) c->x += 1000; c->y += 1000; }; - manager.forMatchingIterable(iterable, fn, nullptr, 3); + manager.forMatchingIterable(iterable, fn, nullptr, true); } { @@ -1365,7 +1365,7 @@ TEST(EC, MultiThreadedForMatching) { EXPECT_TRUE(manager.isAlive(first)); EXPECT_TRUE(manager.isAlive(second)); - manager.callForMatchingFunction(fnIdx, 2); + manager.callForMatchingFunction(fnIdx, true); EXPECT_TRUE(manager.isAlive(first)); EXPECT_FALSE(manager.isAlive(second)); From 0f2a98b8867e706d2880f56f9ae7a5a369b9cf02 Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Mon, 6 Sep 2021 20:57:13 +0900 Subject: [PATCH 3/7] Usage of ThreadPool fixes --- src/EC/Manager.hpp | 62 ++++++++++++++++++------------------- src/EC/ThreadPool.hpp | 2 -- src/test/ECTest.cpp | 5 +++ src/test/ThreadPoolTest.cpp | 8 ++--- 4 files changed, 40 insertions(+), 37 deletions(-) diff --git a/src/EC/Manager.hpp b/src/EC/Manager.hpp index 225ceb8..cdc52aa 100644 --- a/src/EC/Manager.hpp +++ b/src/EC/Manager.hpp @@ -542,7 +542,7 @@ namespace EC const std::size_t& entityID, CType& ctype, Function* function, - void* userData= nullptr) + void* userData = nullptr) { (*function)( entityID, @@ -692,9 +692,9 @@ namespace EC }, &fnDataAr.at(i)); } threadPool.wakeThreads(); - while(!threadPool.isAllThreadsWaiting()) { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } + do { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } while(!threadPool.isAllThreadsWaiting()); } } @@ -811,9 +811,9 @@ namespace EC }, &fnDataAr.at(i)); } threadPool.wakeThreads(); - while(!threadPool.isAllThreadsWaiting()) { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } + do { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } while(!threadPool.isAllThreadsWaiting()); } } @@ -955,9 +955,9 @@ namespace EC }, &fnDataAr.at(i)); } threadPool.wakeThreads(); - while(!threadPool.isAllThreadsWaiting()) { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } + do { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } while(!threadPool.isAllThreadsWaiting()); } }))); @@ -1034,9 +1034,9 @@ namespace EC }, &fnDataAr.at(i)); } threadPool.wakeThreads(); - while(!threadPool.isAllThreadsWaiting()) { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } + do { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } while(!threadPool.isAllThreadsWaiting()); } return matchingV; @@ -1413,9 +1413,9 @@ namespace EC }, &fnDataAr.at(i)); } threadPool.wakeThreads(); - while(!threadPool.isAllThreadsWaiting()) { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } + do { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } while(!threadPool.isAllThreadsWaiting()); } // call functions on matching entities @@ -1475,9 +1475,9 @@ namespace EC }, &fnDataAr.at(i)); } threadPool.wakeThreads(); - while(!threadPool.isAllThreadsWaiting()) { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } + do { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } while(!threadPool.isAllThreadsWaiting()); } } ); @@ -1606,9 +1606,9 @@ namespace EC }, &fnDataAr.at(i)); } threadPool.wakeThreads(); - while(!threadPool.isAllThreadsWaiting()) { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } + do { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } while(!threadPool.isAllThreadsWaiting()); } // call functions on matching entities @@ -1673,9 +1673,9 @@ namespace EC }, &fnDataAr.at(i)); } threadPool.wakeThreads(); - while(!threadPool.isAllThreadsWaiting()) { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } + do { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } while(!threadPool.isAllThreadsWaiting()); } } ); @@ -1737,9 +1737,9 @@ namespace EC }, &fnDataAr.at(i)); } threadPool.wakeThreads(); - while(!threadPool.isAllThreadsWaiting()) { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } + do { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } while(!threadPool.isAllThreadsWaiting()); } } @@ -1817,9 +1817,9 @@ namespace EC }, &fnDataAr.at(i)); } threadPool.wakeThreads(); - while(!threadPool.isAllThreadsWaiting()) { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } + do { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } while(!threadPool.isAllThreadsWaiting()); } } }; diff --git a/src/EC/ThreadPool.hpp b/src/EC/ThreadPool.hpp index 30969e1..f6f9e36 100644 --- a/src/EC/ThreadPool.hpp +++ b/src/EC/ThreadPool.hpp @@ -87,13 +87,11 @@ public: } void wakeThreads(bool wakeAll = true) { - unsigned int counter = 0; if(wakeAll) { cv.notify_all(); } else { cv.notify_one(); } - while(isAllThreadsWaiting() && counter++ < 10000) {} } int getWaitCount() { diff --git a/src/test/ECTest.cpp b/src/test/ECTest.cpp index 3105903..a59fa98 100644 --- a/src/test/ECTest.cpp +++ b/src/test/ECTest.cpp @@ -458,6 +458,11 @@ TEST(EC, MultiThreaded) EXPECT_EQ(0, manager.getEntityData(i)->y); } +#ifndef NDEBUG + std::clog << "Addr of C0 for entity 0 is " << manager.getEntityData(0) + << std::endl; +#endif + manager.forMatchingSignature >( [] (const std::size_t& /* id */, void* /* context */, C0* c) { c->x = 1; diff --git a/src/test/ThreadPoolTest.cpp b/src/test/ThreadPoolTest.cpp index ef7a695..b695d49 100644 --- a/src/test/ThreadPoolTest.cpp +++ b/src/test/ThreadPoolTest.cpp @@ -22,9 +22,9 @@ TEST(ECThreadPool, Simple) { p.wakeThreads(); - while(!p.isAllThreadsWaiting()) { + do { std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } + } while(!p.isAllThreadsWaiting()); ASSERT_EQ(data.load(), 1); @@ -33,9 +33,9 @@ TEST(ECThreadPool, Simple) { } p.wakeThreads(); - while(!p.isAllThreadsWaiting()) { + do { std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } + } while(!p.isAllThreadsWaiting()); ASSERT_EQ(data.load(), 11); } From 13152d1c2221fd53a35a5f78ec00e41b12c6133e Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Mon, 6 Sep 2021 21:01:01 +0900 Subject: [PATCH 4/7] Tweak sleep times for threadpool to finish Remove debug print --- src/EC/Manager.hpp | 20 ++++++++++---------- src/test/ECTest.cpp | 5 ----- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/src/EC/Manager.hpp b/src/EC/Manager.hpp index cdc52aa..ea07b04 100644 --- a/src/EC/Manager.hpp +++ b/src/EC/Manager.hpp @@ -693,7 +693,7 @@ namespace EC } threadPool.wakeThreads(); do { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::microseconds(200)); } while(!threadPool.isAllThreadsWaiting()); } } @@ -812,7 +812,7 @@ namespace EC } threadPool.wakeThreads(); do { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::microseconds(200)); } while(!threadPool.isAllThreadsWaiting()); } } @@ -956,7 +956,7 @@ namespace EC } threadPool.wakeThreads(); do { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::microseconds(200)); } while(!threadPool.isAllThreadsWaiting()); } }))); @@ -1035,7 +1035,7 @@ namespace EC } threadPool.wakeThreads(); do { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::microseconds(200)); } while(!threadPool.isAllThreadsWaiting()); } @@ -1414,7 +1414,7 @@ namespace EC } threadPool.wakeThreads(); do { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::microseconds(200)); } while(!threadPool.isAllThreadsWaiting()); } @@ -1476,7 +1476,7 @@ namespace EC } threadPool.wakeThreads(); do { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::microseconds(200)); } while(!threadPool.isAllThreadsWaiting()); } } @@ -1607,7 +1607,7 @@ namespace EC } threadPool.wakeThreads(); do { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::microseconds(200)); } while(!threadPool.isAllThreadsWaiting()); } @@ -1674,7 +1674,7 @@ namespace EC } threadPool.wakeThreads(); do { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::microseconds(200)); } while(!threadPool.isAllThreadsWaiting()); } } @@ -1738,7 +1738,7 @@ namespace EC } threadPool.wakeThreads(); do { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::microseconds(200)); } while(!threadPool.isAllThreadsWaiting()); } } @@ -1818,7 +1818,7 @@ namespace EC } threadPool.wakeThreads(); do { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::microseconds(200)); } while(!threadPool.isAllThreadsWaiting()); } } diff --git a/src/test/ECTest.cpp b/src/test/ECTest.cpp index a59fa98..3105903 100644 --- a/src/test/ECTest.cpp +++ b/src/test/ECTest.cpp @@ -458,11 +458,6 @@ TEST(EC, MultiThreaded) EXPECT_EQ(0, manager.getEntityData(i)->y); } -#ifndef NDEBUG - std::clog << "Addr of C0 for entity 0 is " << manager.getEntityData(0) - << std::endl; -#endif - manager.forMatchingSignature >( [] (const std::size_t& /* id */, void* /* context */, C0* c) { c->x = 1; From 16f410c8ef111ffcc69de3a1eee6ed9fadd832cb Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Tue, 7 Sep 2021 11:29:06 +0900 Subject: [PATCH 5/7] Add isQueueEmpty() to ThreadPool --- src/EC/Manager.hpp | 20 ++++++++++---------- src/EC/ThreadPool.hpp | 5 +++++ src/test/ThreadPoolTest.cpp | 4 ++-- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/EC/Manager.hpp b/src/EC/Manager.hpp index ea07b04..4e5b57a 100644 --- a/src/EC/Manager.hpp +++ b/src/EC/Manager.hpp @@ -694,7 +694,7 @@ namespace EC threadPool.wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isAllThreadsWaiting()); + } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); } } @@ -813,7 +813,7 @@ namespace EC threadPool.wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isAllThreadsWaiting()); + } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); } } @@ -957,7 +957,7 @@ namespace EC threadPool.wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isAllThreadsWaiting()); + } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); } }))); @@ -1036,7 +1036,7 @@ namespace EC threadPool.wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isAllThreadsWaiting()); + } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); } return matchingV; @@ -1415,7 +1415,7 @@ namespace EC threadPool.wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isAllThreadsWaiting()); + } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); } // call functions on matching entities @@ -1477,7 +1477,7 @@ namespace EC threadPool.wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isAllThreadsWaiting()); + } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); } } ); @@ -1608,7 +1608,7 @@ namespace EC threadPool.wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isAllThreadsWaiting()); + } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); } // call functions on matching entities @@ -1675,7 +1675,7 @@ namespace EC threadPool.wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isAllThreadsWaiting()); + } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); } } ); @@ -1739,7 +1739,7 @@ namespace EC threadPool.wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isAllThreadsWaiting()); + } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); } } @@ -1819,7 +1819,7 @@ namespace EC threadPool.wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isAllThreadsWaiting()); + } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); } } }; diff --git a/src/EC/ThreadPool.hpp b/src/EC/ThreadPool.hpp index f6f9e36..aee681d 100644 --- a/src/EC/ThreadPool.hpp +++ b/src/EC/ThreadPool.hpp @@ -104,6 +104,11 @@ public: return waitCount == THREADCOUNT::value; } + bool isQueueEmpty() { + std::lock_guard lock(queueMutex); + return fnQueue.empty(); + } + private: std::vector threads; std::atomic_bool isAlive; diff --git a/src/test/ThreadPoolTest.cpp b/src/test/ThreadPoolTest.cpp index b695d49..2cec50c 100644 --- a/src/test/ThreadPoolTest.cpp +++ b/src/test/ThreadPoolTest.cpp @@ -24,7 +24,7 @@ TEST(ECThreadPool, Simple) { do { std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } while(!p.isAllThreadsWaiting()); + } while(!p.isQueueEmpty() && !p.isAllThreadsWaiting()); ASSERT_EQ(data.load(), 1); @@ -35,7 +35,7 @@ TEST(ECThreadPool, Simple) { do { std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } while(!p.isAllThreadsWaiting()); + } while(!p.isQueueEmpty() && !p.isAllThreadsWaiting()); ASSERT_EQ(data.load(), 11); } From 6a8902ad5106ccda803acd9139ae4039fb7572fd Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Tue, 7 Sep 2021 11:46:38 +0900 Subject: [PATCH 6/7] Allow ThreadPool to be created with < 2 ThreadCount --- src/EC/Manager.hpp | 85 +++++++++++------------ src/EC/ThreadPool.hpp | 130 ++++++++++++++++++++++-------------- src/test/ECTest.cpp | 27 ++++++++ src/test/ThreadPoolTest.cpp | 41 ++++++++++-- 4 files changed, 184 insertions(+), 99 deletions(-) diff --git a/src/EC/Manager.hpp b/src/EC/Manager.hpp index 4e5b57a..a4eb821 100644 --- a/src/EC/Manager.hpp +++ b/src/EC/Manager.hpp @@ -93,7 +93,7 @@ namespace EC std::size_t currentSize = 0; std::unordered_set deletedSet; - ThreadPool threadPool; + std::unique_ptr > threadPool; public: /*! @@ -105,6 +105,9 @@ namespace EC Manager() { resize(EC_INIT_ENTITIES_SIZE); + if(ThreadCount >= 2) { + threadPool = std::make_unique >(); + } } private: @@ -634,7 +637,7 @@ namespace EC BitsetType signatureBitset = BitsetType::template generateBitset(); - if(!useThreadPool) + if(!useThreadPool || !threadPool) { for(std::size_t i = 0; i < currentSize; ++i) { @@ -673,7 +676,7 @@ namespace EC std::get<2>(fnDataAr.at(i)) = &signatureBitset; std::get<3>(fnDataAr.at(i)) = {begin, end}; std::get<4>(fnDataAr.at(i)) = userData; - threadPool.queueFn([&function] (void *ud) { + threadPool->queueFn([&function] (void *ud) { auto *data = static_cast(ud); for(std::size_t i = std::get<3>(*data).at(0); i < std::get<3>(*data).at(1); @@ -691,10 +694,10 @@ namespace EC } }, &fnDataAr.at(i)); } - threadPool.wakeThreads(); + threadPool->wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); + } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting()); } } @@ -753,7 +756,7 @@ namespace EC BitsetType signatureBitset = BitsetType::template generateBitset(); - if(!useThreadPool) + if(!useThreadPool || !threadPool) { for(std::size_t i = 0; i < currentSize; ++i) { @@ -792,7 +795,7 @@ namespace EC std::get<3>(fnDataAr.at(i)) = {begin, end}; std::get<4>(fnDataAr.at(i)) = userData; std::get<5>(fnDataAr.at(i)) = function; - threadPool.queueFn([] (void *ud) { + threadPool->queueFn([] (void *ud) { auto *data = static_cast(ud); for(std::size_t i = std::get<3>(*data).at(0); i < std::get<3>(*data).at(1); @@ -810,10 +813,10 @@ namespace EC } }, &fnDataAr.at(i)); } - threadPool.wakeThreads(); + threadPool->wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); + } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting()); } } @@ -906,7 +909,7 @@ namespace EC std::vector matching, void* userData) { - if(!useThreadPool) + if(!useThreadPool || !threadPool) { for(auto eid : matching) { @@ -939,7 +942,7 @@ namespace EC std::get<2>(fnDataAr.at(i)) = {begin, end}; std::get<3>(fnDataAr.at(i)) = userData; std::get<4>(fnDataAr.at(i)) = &matching; - threadPool.queueFn([function, helper] (void* ud) { + threadPool->queueFn([function, helper] (void* ud) { auto *data = static_cast(ud); for(std::size_t i = std::get<2>(*data).at(0); i < std::get<2>(*data).at(1); @@ -954,10 +957,10 @@ namespace EC } }, &fnDataAr.at(i)); } - threadPool.wakeThreads(); + threadPool->wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); + } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting()); } }))); @@ -970,7 +973,7 @@ namespace EC { std::vector > matchingV(bitsets.size()); - if(!useThreadPool) + if(!useThreadPool || !threadPool) { for(std::size_t i = 0; i < currentSize; ++i) { @@ -1012,7 +1015,7 @@ namespace EC std::get<3>(fnDataAr.at(i)) = &bitsets; std::get<4>(fnDataAr.at(i)) = &entities; std::get<5>(fnDataAr.at(i)) = &mutex; - threadPool.queueFn([] (void *ud) { + threadPool->queueFn([] (void *ud) { auto *data = static_cast(ud); for(std::size_t i = std::get<1>(*data).at(0); i < std::get<1>(*data).at(1); @@ -1033,10 +1036,10 @@ namespace EC } }, &fnDataAr.at(i)); } - threadPool.wakeThreads(); + threadPool->wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); + } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting()); } return matchingV; @@ -1351,7 +1354,7 @@ namespace EC }); // find and store entities matching signatures - if(!useThreadPool) + if(!useThreadPool || !threadPool) { for(std::size_t eid = 0; eid < currentSize; ++eid) { @@ -1394,7 +1397,7 @@ namespace EC std::get<3>(fnDataAr.at(i)) = signatureBitsets; std::get<4>(fnDataAr.at(i)) = &mutex; - threadPool.queueFn([] (void *ud) { + threadPool->queueFn([] (void *ud) { auto *data = static_cast(ud); for(std::size_t i = std::get<1>(*data).at(0); i < std::get<1>(*data).at(1); @@ -1412,10 +1415,10 @@ namespace EC } }, &fnDataAr.at(i)); } - threadPool.wakeThreads(); + threadPool->wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); + } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting()); } // call functions on matching entities @@ -1432,7 +1435,7 @@ namespace EC EC::Meta::Morph< SignatureComponents, ForMatchingSignatureHelper<> >; - if(!useThreadPool) { + if(!useThreadPool || !threadPool) { for(const auto& id : multiMatchingEntities[index]) { if(isAlive(id)) { Helper::call(id, *this, func, userData); @@ -1459,7 +1462,7 @@ namespace EC std::get<2>(fnDataAr.at(i)) = {begin, end}; std::get<3>(fnDataAr.at(i)) = &multiMatchingEntities; std::get<4>(fnDataAr.at(i)) = index; - threadPool.queueFn([&func] (void *ud) { + threadPool->queueFn([&func] (void *ud) { auto *data = static_cast(ud); for(std::size_t i = std::get<2>(*data).at(0); i < std::get<2>(*data).at(1); @@ -1474,10 +1477,10 @@ namespace EC } }, &fnDataAr.at(i)); } - threadPool.wakeThreads(); + threadPool->wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); + } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting()); } } ); @@ -1544,7 +1547,7 @@ namespace EC }); // find and store entities matching signatures - if(!useThreadPool) + if(!useThreadPool || !threadPool) { for(std::size_t eid = 0; eid < currentSize; ++eid) { @@ -1587,7 +1590,7 @@ namespace EC std::get<3>(fnDataAr.at(i)) = signatureBitsets; std::get<4>(fnDataAr.at(i)) = &mutex; - threadPool.queueFn([] (void *ud) { + threadPool->queueFn([] (void *ud) { auto *data = static_cast(ud); for(std::size_t i = std::get<1>(*data).at(0); i < std::get<1>(*data).at(1); @@ -1605,10 +1608,10 @@ namespace EC } }, &fnDataAr.at(i)); } - threadPool.wakeThreads(); + threadPool->wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); + } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting()); } // call functions on matching entities @@ -1625,7 +1628,7 @@ namespace EC EC::Meta::Morph< SignatureComponents, ForMatchingSignatureHelper<> >; - if(!useThreadPool) + if(!useThreadPool || !threadPool) { for(const auto& id : multiMatchingEntities[index]) { @@ -1657,7 +1660,7 @@ namespace EC std::get<2>(fnDataAr.at(i)) = {begin, end}; std::get<3>(fnDataAr.at(i)) = &multiMatchingEntities; std::get<4>(fnDataAr.at(i)) = index; - threadPool.queueFn([&func] (void *ud) { + threadPool->queueFn([&func] (void *ud) { auto *data = static_cast(ud); for(std::size_t i = std::get<2>(*data).at(0); i < std::get<2>(*data).at(1); @@ -1672,10 +1675,10 @@ namespace EC } }, &fnDataAr.at(i)); } - threadPool.wakeThreads(); + threadPool->wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); + } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting()); } } ); @@ -1694,7 +1697,7 @@ namespace EC template void forMatchingSimple(ForMatchingFn fn, void *userData = nullptr, const bool useThreadPool = false) { const BitsetType signatureBitset = BitsetType::template generateBitset(); - if(!useThreadPool) { + if(!useThreadPool || !threadPool) { for(std::size_t i = 0; i < currentSize; ++i) { if(!std::get(entities[i])) { continue; @@ -1723,7 +1726,7 @@ namespace EC std::get<2>(fnDataAr.at(i)) = &signatureBitset; std::get<3>(fnDataAr.at(i)) = {begin, end}; std::get<4>(fnDataAr.at(i)) = userData; - threadPool.queueFn([&fn] (void *ud) { + threadPool->queueFn([&fn] (void *ud) { auto *data = static_cast(ud); for(std::size_t i = std::get<3>(*data).at(0); i < std::get<3>(*data).at(1); @@ -1736,10 +1739,10 @@ namespace EC } }, &fnDataAr.at(i)); } - threadPool.wakeThreads(); + threadPool->wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); + } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting()); } } @@ -1754,7 +1757,7 @@ namespace EC */ template void forMatchingIterable(Iterable iterable, ForMatchingFn fn, void* userData = nullptr, const bool useThreadPool = false) { - if(!useThreadPool) { + if(!useThreadPool || !threadPool) { bool isValid; for(std::size_t i = 0; i < currentSize; ++i) { if(!std::get(entities[i])) { @@ -1793,7 +1796,7 @@ namespace EC std::get<2>(fnDataAr.at(i)) = &iterable; std::get<3>(fnDataAr.at(i)) = {begin, end}; std::get<4>(fnDataAr.at(i)) = userData; - threadPool.queueFn([&fn] (void *ud) { + threadPool->queueFn([&fn] (void *ud) { auto *data = static_cast(ud); bool isValid; for(std::size_t i = std::get<3>(*data).at(0); @@ -1816,10 +1819,10 @@ namespace EC } }, &fnDataAr.at(i)); } - threadPool.wakeThreads(); + threadPool->wakeThreads(); do { std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool.isQueueEmpty() && !threadPool.isAllThreadsWaiting()); + } while(!threadPool->isQueueEmpty() && !threadPool->isAllThreadsWaiting()); } } }; diff --git a/src/EC/ThreadPool.hpp b/src/EC/ThreadPool.hpp index aee681d..c7d078e 100644 --- a/src/EC/ThreadPool.hpp +++ b/src/EC/ThreadPool.hpp @@ -20,64 +20,66 @@ namespace Internal { using TPQueueType = std::queue; } // namespace Internal -template -class ThreadPool; - template -class ThreadPool= 2)>::type> { +class ThreadPool { public: using THREADCOUNT = std::integral_constant; ThreadPool() : waitCount(0) { isAlive.store(true); - for(unsigned int i = 0; i < SIZE; ++i) { - threads.emplace_back([] (std::atomic_bool *isAlive, - std::condition_variable *cv, - std::mutex *cvMutex, - Internal::TPQueueType *fnQueue, - std::mutex *queueMutex, - int *waitCount, - std::mutex *waitCountMutex) { - bool hasFn = false; - Internal::TPTupleType fnTuple; - while(isAlive->load()) { - hasFn = false; - { - std::lock_guard lock(*queueMutex); - if(!fnQueue->empty()) { - fnTuple = fnQueue->front(); - fnQueue->pop(); - hasFn = true; + if(SIZE >= 2) { + for(unsigned int i = 0; i < SIZE; ++i) { + threads.emplace_back([] (std::atomic_bool *isAlive, + std::condition_variable *cv, + std::mutex *cvMutex, + Internal::TPQueueType *fnQueue, + std::mutex *queueMutex, + int *waitCount, + std::mutex *waitCountMutex) { + bool hasFn = false; + Internal::TPTupleType fnTuple; + while(isAlive->load()) { + hasFn = false; + { + std::lock_guard lock(*queueMutex); + if(!fnQueue->empty()) { + fnTuple = fnQueue->front(); + fnQueue->pop(); + hasFn = true; + } + } + if(hasFn) { + std::get<0>(fnTuple)(std::get<1>(fnTuple)); + continue; + } + + { + std::lock_guard lock(*waitCountMutex); + *waitCount += 1; + } + { + std::unique_lock lock(*cvMutex); + cv->wait(lock); + } + { + std::lock_guard lock(*waitCountMutex); + *waitCount -= 1; } } - if(hasFn) { - std::get<0>(fnTuple)(std::get<1>(fnTuple)); - continue; - } - - { - std::lock_guard lock(*waitCountMutex); - *waitCount += 1; - } - { - std::unique_lock lock(*cvMutex); - cv->wait(lock); - } - { - std::lock_guard lock(*waitCountMutex); - *waitCount -= 1; - } - } - }, &isAlive, &cv, &cvMutex, &fnQueue, &queueMutex, &waitCount, &waitCountMutex); + }, &isAlive, &cv, &cvMutex, &fnQueue, &queueMutex, &waitCount, + &waitCountMutex); + } } } ~ThreadPool() { - isAlive.store(false); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - cv.notify_all(); - for(auto &thread : threads) { - thread.join(); + if(SIZE >= 2) { + isAlive.store(false); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + cv.notify_all(); + for(auto &thread : threads) { + thread.join(); + } } } @@ -87,10 +89,32 @@ public: } void wakeThreads(bool wakeAll = true) { - if(wakeAll) { - cv.notify_all(); + if(SIZE >= 2) { + // wake threads to pull functions from queue and run them + if(wakeAll) { + cv.notify_all(); + } else { + cv.notify_one(); + } } else { - cv.notify_one(); + // pull functions from queue and run them on main thread + Internal::TPTupleType fnTuple; + bool hasFn; + do { + { + std::lock_guard lock(queueMutex); + if(!fnQueue.empty()) { + hasFn = true; + fnTuple = fnQueue.front(); + fnQueue.pop(); + } else { + hasFn = false; + } + } + if(hasFn) { + std::get<0>(fnTuple)(std::get<1>(fnTuple)); + } + } while(hasFn); } } @@ -100,8 +124,12 @@ public: } bool isAllThreadsWaiting() { - std::lock_guard lock(waitCountMutex); - return waitCount == THREADCOUNT::value; + if(SIZE >= 2) { + std::lock_guard lock(waitCountMutex); + return waitCount == THREADCOUNT::value; + } else { + return true; + } } bool isQueueEmpty() { diff --git a/src/test/ECTest.cpp b/src/test/ECTest.cpp index 3105903..75e65ae 100644 --- a/src/test/ECTest.cpp +++ b/src/test/ECTest.cpp @@ -1370,3 +1370,30 @@ TEST(EC, MultiThreadedForMatching) { EXPECT_TRUE(manager.isAlive(first)); EXPECT_FALSE(manager.isAlive(second)); } + +TEST(EC, ManagerWithLowThreadCount) { + EC::Manager manager; + + std::array entities; + for(auto &id : entities) { + id = manager.addEntity(); + manager.addComponent(id); + } + + for(const auto &id : entities) { + auto *component = manager.getEntityComponent(id); + EXPECT_EQ(component->x, 0); + EXPECT_EQ(component->y, 0); + } + + manager.forMatchingSignature >([] (std::size_t /*id*/, void* /*ud*/, C0 *c) { + c->x += 1; + c->y += 1; + }, nullptr, true); + + for(const auto &id : entities) { + auto *component = manager.getEntityComponent(id); + EXPECT_EQ(component->x, 1); + EXPECT_EQ(component->y, 1); + } +} diff --git a/src/test/ThreadPoolTest.cpp b/src/test/ThreadPoolTest.cpp index 2cec50c..31c8529 100644 --- a/src/test/ThreadPoolTest.cpp +++ b/src/test/ThreadPoolTest.cpp @@ -2,15 +2,42 @@ #include -//using OneThreadPool = EC::ThreadPool<1>; +using OneThreadPool = EC::ThreadPool<1>; using ThreeThreadPool = EC::ThreadPool<3>; -//TEST(ECThreadPool, CannotCompile) { -// OneThreadPool tp; -//} - -TEST(ECThreadPool, Simple) { - ThreeThreadPool p{}; +TEST(ECThreadPool, CannotCompile) { + OneThreadPool p; + std::atomic_int data; + data.store(0); + const auto fn = [](void *ud) { + auto *data = static_cast(ud); + data->fetch_add(1); + }; + + p.queueFn(fn, &data); + + p.wakeThreads(); + + do { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } while(!p.isQueueEmpty() && !p.isAllThreadsWaiting()); + + ASSERT_EQ(data.load(), 1); + + for(unsigned int i = 0; i < 10; ++i) { + p.queueFn(fn, &data); + } + p.wakeThreads(); + + do { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } while(!p.isQueueEmpty() && !p.isAllThreadsWaiting()); + + ASSERT_EQ(data.load(), 11); +} + +TEST(ECThreadPool, Simple) { + ThreeThreadPool p; std::atomic_int data; data.store(0); const auto fn = [](void *ud) { From 183eab10b37363dc6c287f251f2acd2b178c0239 Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Tue, 7 Sep 2021 12:04:42 +0900 Subject: [PATCH 7/7] Update documentation --- src/EC/Manager.hpp | 86 +++++++++++++++++++++++++++++-------------- src/EC/ThreadPool.hpp | 37 +++++++++++++++++++ 2 files changed, 96 insertions(+), 27 deletions(-) diff --git a/src/EC/Manager.hpp b/src/EC/Manager.hpp index a4eb821..1160759 100644 --- a/src/EC/Manager.hpp +++ b/src/EC/Manager.hpp @@ -49,12 +49,10 @@ namespace EC Note that all components must have a default constructor. An optional third template parameter may be given, which is the size of - the number of threads in the internal ThreadPool, and it must be at - least 2. If ThreadCount is 1 or less, the program will fail to compile. - Note that using the internal ThreadPool is optional; several member - functions of Manager accept a boolean indicating if the internal - ThreadPool is to be used. Always passing false for that value will - result in never using the ThreadPool. + the number of threads in the internal ThreadPool, and should be at + least 2. If ThreadCount is 1 or less, then the ThreadPool will not be + created and it will never be used, even if the "true" parameter is given + for functions that enable its usage. Example: \code{.cpp} @@ -481,16 +479,16 @@ namespace EC ).template getTagBit() = true; } - /*! - \brief Removes the given Tag from the given Entity. + /*! + \brief Removes the given Tag from the given Entity. - If the Entity does not have the Tag given, nothing will change. + If the Entity does not have the Tag given, nothing will change. - Example: - \code{.cpp} - manager.removeTag(entityID); - \endcode - */ + Example: + \code{.cpp} + manager.removeTag(entityID); + \endcode + */ template void removeTag(const std::size_t& entityID) { @@ -1319,6 +1317,14 @@ namespace EC The second parameter (default nullptr) will be provided to every function call as a void* (context). + The third parameter is default false (not multi-threaded). + Otherwise, if true, then the thread pool will be used to call the + given function in parallel across all entities. Note that + multi-threading is based on splitting the task of calling the + function across sections of entities. Thus if there are only a small + amount of entities in the manager, then using multiple threads may + not have as great of a speed-up. + This function was created for the use case where there are many entities in the system which can cause multiple calls to forMatchingSignature to be slow due to the overhead of iterating @@ -1513,6 +1519,14 @@ namespace EC The second parameter (default nullptr) will be provided to every function call as a void* (context). + The third parameter is default false (not multi-threaded). + Otherwise, if true, then the thread pool will be used to call the + given function in parallel across all entities. Note that + multi-threading is based on splitting the task of calling the + function across sections of entities. Thus if there are only a small + amount of entities in the manager, then using multiple threads may + not have as great of a speed-up. + This function was created for the use case where there are many entities in the system which can cause multiple calls to forMatchingSignature to be slow due to the overhead of iterating @@ -1687,12 +1701,21 @@ namespace EC typedef void ForMatchingFn(std::size_t, Manager*, void*); /*! - * \brief A simple version of forMatchingSignature() - * - * This function behaves like forMatchingSignature(), but instead of - * providing a function with each requested component as a parameter, - * the function receives a pointer to the manager itself, with which to - * query component/tag data. + \brief A simple version of forMatchingSignature() + + This function behaves like forMatchingSignature(), but instead of + providing a function with each requested component as a parameter, + the function receives a pointer to the manager itself, with which to + query component/tag data. + + The third parameter can be optionally used to enable the use of the + internal ThreadPool to call the function in parallel. Using the + value false (which is the default) will not use the ThreadPool and + run the function sequentially on all entities on the main thread. + Note that multi-threading is based on splitting the task of calling + the functions across sections of entities. Thus if there are only a + small amount of entities in the manager, then using multiple threads + may not have as great of a speed-up. */ template void forMatchingSimple(ForMatchingFn fn, void *userData = nullptr, const bool useThreadPool = false) { @@ -1747,13 +1770,22 @@ namespace EC } /*! - * \brief Similar to forMatchingSimple(), but with a collection of Component/Tag indices - * - * This function works like forMatchingSimple(), but instead of - * providing template types that filter out non-matching entities, an - * iterable of indices must be provided which correlate to matching - * Component/Tag indices. The function given must match the previously - * defined typedef of type ForMatchingFn. + \brief Similar to forMatchingSimple(), but with a collection of Component/Tag indices + + This function works like forMatchingSimple(), but instead of + providing template types that filter out non-matching entities, an + iterable of indices must be provided which correlate to matching + Component/Tag indices. The function given must match the previously + defined typedef of type ForMatchingFn. + + The fourth parameter can be optionally used to enable the use of the + internal ThreadPool to call the function in parallel. Using the + value false (which is the default) will not use the ThreadPool and + run the function sequentially on all entities on the main thread. + Note that multi-threading is based on splitting the task of calling + the functions across sections of entities. Thus if there are only a + small amount of entities in the manager, then using multiple threads + may not have as great of a speed-up. */ template void forMatchingIterable(Iterable iterable, ForMatchingFn fn, void* userData = nullptr, const bool useThreadPool = false) { diff --git a/src/EC/ThreadPool.hpp b/src/EC/ThreadPool.hpp index c7d078e..3580b5e 100644 --- a/src/EC/ThreadPool.hpp +++ b/src/EC/ThreadPool.hpp @@ -20,6 +20,12 @@ namespace Internal { using TPQueueType = std::queue; } // namespace Internal +/*! + \brief Implementation of a Thread Pool. + + Note that if SIZE is less than 2, then ThreadPool will not create threads and + run queued functions on the calling thread. +*/ template class ThreadPool { public: @@ -83,11 +89,27 @@ public: } } + /*! + \brief Queues a function to be called (doesn't start calling yet). + + To run the queued functions, wakeThreads() must be called to wake the + waiting threads which will start pulling functions from the queue to be + called. + */ void queueFn(std::function&& fn, void *ud = nullptr) { std::lock_guard lock(queueMutex); fnQueue.emplace(std::make_tuple(fn, ud)); } + /*! + \brief Wakes waiting threads to start running queued functions. + + If SIZE is less than 2, then this function call will block until all the + queued functions have been executed on the calling thread. + + If SIZE is 2 or greater, then this function will return immediately after + waking one or all threads, depending on the given boolean parameter. + */ void wakeThreads(bool wakeAll = true) { if(SIZE >= 2) { // wake threads to pull functions from queue and run them @@ -118,11 +140,23 @@ public: } } + /*! + \brief Gets the number of waiting threads. + + If all threads are waiting, this should equal ThreadCount. + + If SIZE is less than 2, then this will always return 0. + */ int getWaitCount() { std::lock_guard lock(waitCountMutex); return waitCount; } + /*! + \brief Returns true if all threads are waiting. + + If SIZE is less than 2, then this will always return true. + */ bool isAllThreadsWaiting() { if(SIZE >= 2) { std::lock_guard lock(waitCountMutex); @@ -132,6 +166,9 @@ public: } } + /*! + \brief Returns true if the function queue is empty. + */ bool isQueueEmpty() { std::lock_guard lock(queueMutex); return fnQueue.empty();