From f44d2f8c7b2cfb342a442ef9160a03356d6a2b4f Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Mon, 6 Sep 2021 19:52:23 +0900 Subject: [PATCH] WIP convert Manager to use ThreadPool valgrind seems to report memory issues, and documentation may need more updating. --- src/EC/Manager.hpp | 813 +++++++++++++++++++++++--------------------- src/test/ECTest.cpp | 22 +- 2 files changed, 436 insertions(+), 399 deletions(-) diff --git a/src/EC/Manager.hpp b/src/EC/Manager.hpp index 8e4b08c..225ceb8 100644 --- a/src/EC/Manager.hpp +++ b/src/EC/Manager.hpp @@ -36,6 +36,8 @@ #include "Meta/IndexOf.hpp" #include "Bitset.hpp" +#include "ThreadPool.hpp" + namespace EC { /*! @@ -46,12 +48,20 @@ namespace EC Note that all components must have a default constructor. + An optional third template parameter may be given, which is the size of + the number of threads in the internal ThreadPool, and it must be at + least 2. If ThreadCount is 1 or less, the program will fail to compile. + Note that using the internal ThreadPool is optional; several member + functions of Manager accept a boolean indicating if the internal + ThreadPool is to be used. Always passing false for that value will + result in never using the ThreadPool. + Example: \code{.cpp} EC::Manager, TypeList> manager; \endcode */ - template + template struct Manager { public: @@ -83,6 +93,8 @@ namespace EC std::size_t currentSize = 0; std::unordered_set deletedSet; + ThreadPool threadPool; + public: /*! \brief Initializes the manager with a default capacity. @@ -516,11 +528,11 @@ namespace EC const std::size_t& entityID, CType& ctype, Function&& function, - void* context = nullptr) + void* userData = nullptr) { function( entityID, - context, + userData, ctype.template getEntityData(entityID)... ); } @@ -530,11 +542,11 @@ namespace EC const std::size_t& entityID, CType& ctype, Function* function, - void* context = nullptr) + void* userData= nullptr) { (*function)( entityID, - context, + userData, ctype.template getEntityData(entityID)... ); } @@ -544,13 +556,13 @@ namespace EC const std::size_t& entityID, CType& ctype, Function&& function, - void* context = nullptr) const + void* userData = nullptr) const { ForMatchingSignatureHelper::call( entityID, ctype, std::forward(function), - context); + userData); } template @@ -558,13 +570,13 @@ namespace EC const std::size_t& entityID, CType& ctype, Function* function, - void* context = nullptr) const + void* userData = nullptr) const { ForMatchingSignatureHelper::callPtr( entityID, ctype, function, - context); + userData); } }; @@ -581,14 +593,15 @@ namespace EC The second parameter is default nullptr and will be passed to the function call as the second parameter as a means of providing - context (useful when the function is not a lambda function). The - third parameter is default 1 (not multi-threaded). If the third - parameter threadCount is set to a value greater than 1, then - threadCount threads will be used. Note that multi-threading is - based on splitting the task of calling the function across sections - of entities. Thus if there are only a small amount of entities in - the manager, then using multiple threads may not have as great of a - speed-up. + context (useful when the function is not a lambda function). + + The third parameter is default false (not multi-threaded). + Otherwise, if true, then the thread pool will be used to call the + given function in parallel across all entities. Note that + multi-threading is based on splitting the task of calling the + function across sections of entities. Thus if there are only a small + amount of entities in the manager, then using multiple threads may + not have as great of a speed-up. Example: \code{.cpp} @@ -609,8 +622,8 @@ namespace EC */ template void forMatchingSignature(Function&& function, - void* context = nullptr, - std::size_t threadCount = 1) + void* userData = nullptr, + const bool useThreadPool = false) { using SignatureComponents = typename EC::Meta::Matching::type; @@ -621,7 +634,7 @@ namespace EC BitsetType signatureBitset = BitsetType::template generateBitset(); - if(threadCount <= 1) + if(!useThreadPool) { for(std::size_t i = 0; i < currentSize; ++i) { @@ -634,52 +647,53 @@ namespace EC == signatureBitset) { Helper::call(i, *this, - std::forward(function), context); + std::forward(function), userData); } } } else { - std::vector threads(threadCount); - std::size_t s = currentSize / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) - { + using TPFnDataType = std::tuple, void*>; + std::array fnDataAr; + + std::size_t s = currentSize / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) - { + if(i == ThreadCount - 1) { end = currentSize; - } - else - { + } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &function, &signatureBitset, &context] - (std::size_t begin, - std::size_t end) { - for(std::size_t i = begin; i < end; ++i) - { - if(!std::get(this->entities[i])) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = &entities; + std::get<2>(fnDataAr.at(i)) = &signatureBitset; + std::get<3>(fnDataAr.at(i)) = {begin, end}; + std::get<4>(fnDataAr.at(i)) = userData; + threadPool.queueFn([&function] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<3>(*data).at(0); + i < std::get<3>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { continue; } - if((signatureBitset - & std::get(entities[i])) - == signatureBitset) - { - Helper::call(i, *this, - std::forward(function), context); + if((*std::get<2>(*data) + & std::get( + std::get<1>(*data)->at(i))) + == *std::get<2>(*data)) { + Helper::call(i, *std::get<0>(*data), std::forward(function), std::get<4>(*data)); } } - }, - begin, - end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } } @@ -696,14 +710,15 @@ namespace EC The second parameter is default nullptr and will be passed to the function call as the second parameter as a means of providing - context (useful when the function is not a lambda function). The - third parameter is default 1 (not multi-threaded). If the third - parameter threadCount is set to a value greater than 1, then - threadCount threads will be used. Note that multi-threading is based - on splitting the task of calling the function across sections of - entities. Thus if there are only a small amount of entities in the - manager, then using multiple threads may not have as great of a - speed-up. + context (useful when the function is not a lambda function). + + The third parameter is default false (not multi-threaded). + Otherwise, if true, then the thread pool will be used to call the + given function in parallel across all entities. Note that + multi-threading is based on splitting the task of calling the + function across sections of entities. Thus if there are only a small + amount of entities in the manager, then using multiple threads may + not have as great of a speed-up. Example: \code{.cpp} @@ -726,8 +741,8 @@ namespace EC */ template void forMatchingSignaturePtr(Function* function, - void* context = nullptr, - std::size_t threadCount = 1) + void* userData = nullptr, + const bool useThreadPool = false) { using SignatureComponents = typename EC::Meta::Matching::type; @@ -738,7 +753,7 @@ namespace EC BitsetType signatureBitset = BitsetType::template generateBitset(); - if(threadCount <= 1) + if(!useThreadPool) { for(std::size_t i = 0; i < currentSize; ++i) { @@ -750,51 +765,54 @@ namespace EC if((signatureBitset & std::get(entities[i])) == signatureBitset) { - Helper::callPtr(i, *this, function, context); + Helper::callPtr(i, *this, function, userData); } } } else { - std::vector threads(threadCount); - std::size_t s = currentSize / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) - { + using TPFnDataType = std::tuple, void*, Function*>; + std::array fnDataAr; + + std::size_t s = currentSize / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) - { + if(i == ThreadCount - 1) { end = currentSize; - } - else - { + } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &function, &signatureBitset, &context] - (std::size_t begin, - std::size_t end) { - for(std::size_t i = begin; i < end; ++i) - { - if(!std::get(this->entities[i])) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = &entities; + std::get<2>(fnDataAr.at(i)) = &signatureBitset; + std::get<3>(fnDataAr.at(i)) = {begin, end}; + std::get<4>(fnDataAr.at(i)) = userData; + std::get<5>(fnDataAr.at(i)) = function; + threadPool.queueFn([] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<3>(*data).at(0); + i < std::get<3>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { continue; } - if((signatureBitset - & std::get(entities[i])) - == signatureBitset) - { - Helper::callPtr(i, *this, function, context); + if((*std::get<2>(*data) + & std::get( + std::get<1>(*data)->at(i))) + == *std::get<2>(*data)) { + Helper::callPtr(i, *std::get<0>(*data), std::get<5>(*data), std::get<4>(*data)); } } - }, - begin, - end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } } @@ -859,7 +877,7 @@ namespace EC template std::size_t addForMatchingFunction( Function&& function, - void* context = nullptr) + void* userData = nullptr) { while(forMatchingFunctions.find(functionIndex) != forMatchingFunctions.end()) @@ -882,54 +900,63 @@ namespace EC functionIndex, std::make_tuple( signatureBitset, - context, + userData, [function, helper, this] - (std::size_t threadCount, + (const bool useThreadPool, std::vector matching, - void* context) + void* userData) { - if(threadCount <= 1 || matching.size() < threadCount) + if(!useThreadPool) { for(auto eid : matching) { if(isAlive(eid)) { helper.callInstancePtr( - eid, *this, &function, context); + eid, *this, &function, userData); } } } else { - std::vector threads(threadCount); - std::size_t s = matching.size() / threadCount; - for(std::size_t i = 0; i < threadCount; ++ i) - { + using TPFnDataType = std::tuple, void*, const std::vector*>; + std::array fnDataAr; + + std::size_t s = matching.size() / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) { + if(i == ThreadCount - 1) { end = matching.size(); } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &function, &helper, &context, &matching] - (std::size_t begin, - std::size_t end) { - for(std::size_t j = begin; j < end; ++j) - { - if(isAlive(matching[j])) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = &entities; + std::get<2>(fnDataAr.at(i)) = {begin, end}; + std::get<3>(fnDataAr.at(i)) = userData; + std::get<4>(fnDataAr.at(i)) = &matching; + threadPool.queueFn([function, helper] (void* ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<2>(*data).at(0); + i < std::get<2>(*data).at(1); + ++i) { + if(std::get<0>(*data)->isAlive(std::get<4>(*data)->at(i))) { helper.callInstancePtr( - matching[j], *this, &function, context); + std::get<4>(*data)->at(i), + *std::get<0>(*data), + &function, + std::get<3>(*data)); } } - }, - begin, end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } }))); @@ -939,11 +966,11 @@ namespace EC private: std::vector > getMatchingEntities( - std::vector bitsets, std::size_t threadCount = 1) + std::vector bitsets, const bool useThreadPool = false) { std::vector > matchingV(bitsets.size()); - if(threadCount <= 1 || currentSize <= threadCount) + if(!useThreadPool) { for(std::size_t i = 0; i < currentSize; ++i) { @@ -963,63 +990,52 @@ namespace EC } else { - std::vector threads(threadCount); - std::size_t s = currentSize / threadCount; - std::mutex mutex; + using TPFnDataType = std::tuple, std::vector >*, const std::vector*, EntitiesType*, std::mutex*>; + std::array fnDataAr; - if(s == 0) { - for(std::size_t i = 0; i < currentSize; ++i) { - threads[i] = std::thread( - [this, &matchingV, &bitsets, &mutex] (std::size_t idx) { - if(!isAlive(idx)) { - return; + std::size_t s = currentSize / ThreadCount; + std::mutex mutex; + for(std::size_t i = 0; i < ThreadCount; ++i) { + std::size_t begin = s * i; + std::size_t end; + if(i == ThreadCount - 1) { + end = currentSize; + } else { + end = s * (i + 1); + } + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = {begin, end}; + std::get<2>(fnDataAr.at(i)) = &matchingV; + std::get<3>(fnDataAr.at(i)) = &bitsets; + std::get<4>(fnDataAr.at(i)) = &entities; + std::get<5>(fnDataAr.at(i)) = &mutex; + threadPool.queueFn([] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<1>(*data).at(0); + i < std::get<1>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { + continue; } - for(std::size_t k = 0; k < bitsets.size(); ++k) - { - if(((*bitsets[k]) & - std::get(entities[idx])) - == (*bitsets[k])) - { - std::lock_guard guard(mutex); - matchingV[k].push_back(idx); + for(std::size_t j = 0; + j < std::get<3>(*data)->size(); + ++j) { + if(((*std::get<3>(*data)->at(j)) + & std::get(std::get<4>(*data)->at(i))) + == (*std::get<3>(*data)->at(j))) { + std::lock_guard lock(*std::get<5>(*data)); + std::get<2>(*data)->at(j).push_back(i); } } - }, i); - } - for(std::size_t i = 0; i < currentSize; ++i) { - threads[i].join(); - } - } else { - for (std::size_t i = 0; i < threadCount; ++i) { - std::size_t begin = s * i; - std::size_t end; - if (i == threadCount - 1) { - end = currentSize; - } else { - end = s * (i + 1); } - - threads[i] = std::thread( - [this, &matchingV, &bitsets, &mutex] - (std::size_t begin, std::size_t end) { - for (std::size_t j = begin; j < end; ++j) { - if (!isAlive(j)) { - continue; - } - for (std::size_t k = 0; k < bitsets.size(); ++k) { - if (((*bitsets[k]) & - std::get(entities[j])) - == (*bitsets[k])) { - std::lock_guard guard(mutex); - matchingV[k].push_back(j); - } - } - } - }, begin, end); - } - for (std::size_t i = 0; i < threadCount; ++i) { - threads[i].join(); - } + }, &fnDataAr.at(i)); + } + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } @@ -1031,12 +1047,13 @@ namespace EC /*! \brief Call all stored functions. - The first (and only) parameter can be optionally used to specify the - number of threads to use when calling the functions. Otherwise, this - function is by default not multi-threaded. - Note that multi-threading is based on splitting the task of calling - the functions across sections of entities. Thus if there are only - a small amount of entities in the manager, then using multiple + The first (and only) parameter can be optionally used to enable the + use of the internal ThreadPool to call all stored functions in + parallel. Using the value false (which is the default) will not use + the ThreadPool and run all stored functions sequentially on the main + thread. Note that multi-threading is based on splitting the task of + calling the functions across sections of entities. Thus if there are + only a small amount of entities in the manager, then using multiple threads may not have as great of a speed-up. Example: @@ -1058,7 +1075,7 @@ namespace EC manager.clearForMatchingFunctions(); \endcode */ - void callForMatchingFunctions(std::size_t threadCount = 1) + void callForMatchingFunctions(const bool useThreadPool = false) { std::vector bitsets; for(auto iter = forMatchingFunctions.begin(); @@ -1069,7 +1086,7 @@ namespace EC } std::vector > matching = - getMatchingEntities(bitsets, threadCount); + getMatchingEntities(bitsets, useThreadPool); std::size_t i = 0; for(auto iter = forMatchingFunctions.begin(); @@ -1077,20 +1094,21 @@ namespace EC ++iter) { std::get<2>(iter->second)( - threadCount, matching[i++], std::get<1>(iter->second)); + useThreadPool, matching[i++], std::get<1>(iter->second)); } } /*! \brief Call a specific stored function. - A second parameter can be optionally used to specify the number - of threads to use when calling the function. Otherwise, this - function is by default not multi-threaded. - Note that multi-threading is based on splitting the task of calling - the function across sections of entities. Thus if there are only - a small amount of entities in the manager, then using multiple - threads may not have as great of a speed-up. + The second parameter can be optionally used to enable the use of the + internal ThreadPool to call the stored function in parallel. Using + the value false (which is the default) will not use the ThreadPool + and run the stored function sequentially on the main thread. Note + that multi-threading is based on splitting the task of calling the + functions across sections of entities. Thus if there are only a + small amount of entities in the manager, then using multiple threads + may not have as great of a speed-up. Example: \code{.cpp} @@ -1110,7 +1128,7 @@ namespace EC \return False if a function with the given id does not exist. */ bool callForMatchingFunction(std::size_t id, - std::size_t threadCount = 1) + const bool useThreadPool = false) { auto iter = forMatchingFunctions.find(id); if(iter == forMatchingFunctions.end()) @@ -1119,9 +1137,9 @@ namespace EC } std::vector > matching = getMatchingEntities(std::vector{ - &std::get(iter->second)}, threadCount); + &std::get(iter->second)}, useThreadPool); std::get<2>(iter->second)( - threadCount, matching[0], std::get<1>(iter->second)); + useThreadPool, matching[0], std::get<1>(iter->second)); return true; } @@ -1263,12 +1281,12 @@ namespace EC \return True if id is valid and context was updated */ - bool changeForMatchingFunctionContext(std::size_t id, void* context) + bool changeForMatchingFunctionContext(std::size_t id, void* userData) { auto f = forMatchingFunctions.find(id); if(f != forMatchingFunctions.end()) { - std::get<1>(f->second) = context; + std::get<1>(f->second) = userData; return true; } return false; @@ -1317,11 +1335,11 @@ namespace EC template void forMatchingSignatures( FTuple fTuple, - void* context = nullptr, - const std::size_t threadCount = 1) + void* userData = nullptr, + const bool useThreadPool = false) { - std::vector > multiMatchingEntities( - SigList::size); + std::vector > + multiMatchingEntities(SigList::size); BitsetType signatureBitsets[SigList::size]; // generate bitsets for each signature @@ -1333,7 +1351,7 @@ namespace EC }); // find and store entities matching signatures - if(threadCount <= 1) + if(!useThreadPool) { for(std::size_t eid = 0; eid < currentSize; ++eid) { @@ -1354,48 +1372,49 @@ namespace EC } else { - std::vector threads(threadCount); - std::mutex mutexes[SigList::size]; - std::size_t s = currentSize / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) - { + using TPFnDataType = std::tuple, std::vector >*, BitsetType*, std::mutex*>; + std::array fnDataAr; + + std::mutex mutex; + std::size_t s = currentSize / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) - { + if(i == ThreadCount - 1) { end = currentSize; - } - else - { + } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &mutexes, &multiMatchingEntities, &signatureBitsets] - (std::size_t begin, std::size_t end) - { - for(std::size_t j = begin; j < end; ++j) - { - if(!isAlive(j)) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = {begin, end}; + std::get<2>(fnDataAr.at(i)) = &multiMatchingEntities; + std::get<3>(fnDataAr.at(i)) = signatureBitsets; + std::get<4>(fnDataAr.at(i)) = &mutex; + + threadPool.queueFn([] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<1>(*data).at(0); + i < std::get<1>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { continue; } - for(std::size_t k = 0; k < SigList::size; ++k) - { - if((signatureBitsets[k] - & std::get(entities[j])) - == signatureBitsets[k]) - { - std::lock_guard guard( - mutexes[k]); - multiMatchingEntities[k].push_back(j); + for(std::size_t j = 0; j < SigList::size; ++j) { + if((std::get<3>(*data)[j] & std::get(std::get<0>(*data)->entities[i])) + == std::get<3>(*data)[j]) { + std::lock_guard lock(*std::get<4>(*data)); + std::get<2>(*data)->at(j).push_back(i); } } } - }, begin, end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } @@ -1403,7 +1422,7 @@ namespace EC EC::Meta::forEachDoubleTuple( EC::Meta::Morph >{}, fTuple, - [this, &multiMatchingEntities, &threadCount, &context] + [this, &multiMatchingEntities, useThreadPool, &userData] (auto sig, auto func, auto index) { using SignatureComponents = @@ -1413,55 +1432,51 @@ namespace EC EC::Meta::Morph< SignatureComponents, ForMatchingSignatureHelper<> >; - if(threadCount <= 1) - { - for(const auto& id : multiMatchingEntities[index]) - { - if(isAlive(id)) - { - Helper::call(id, *this, func, context); + if(!useThreadPool) { + for(const auto& id : multiMatchingEntities[index]) { + if(isAlive(id)) { + Helper::call(id, *this, func, userData); } } - } - else - { - std::vector threads(threadCount); + } else { + using TPFnType = std::tuple, std::vector > *, std::size_t>; + std::array fnDataAr; std::size_t s = multiMatchingEntities[index].size() - / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) - { + / ThreadCount; + for(unsigned int i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) - { + if(i == ThreadCount - 1) { end = multiMatchingEntities[index].size(); - } - else - { + } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &multiMatchingEntities, &index, &func, - &context] - (std::size_t begin, std::size_t end) - { - for(std::size_t j = begin; j < end; - ++j) - { - if(isAlive(multiMatchingEntities[index][j])) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = userData; + std::get<2>(fnDataAr.at(i)) = {begin, end}; + std::get<3>(fnDataAr.at(i)) = &multiMatchingEntities; + std::get<4>(fnDataAr.at(i)) = index; + threadPool.queueFn([&func] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<2>(*data).at(0); + i < std::get<2>(*data).at(1); + ++i) { + if(std::get<0>(*data)->isAlive(std::get<3>(*data)->at(std::get<4>(*data)).at(i))) { Helper::call( - multiMatchingEntities[index][j], - *this, + std::get<3>(*data)->at(std::get<4>(*data)).at(i), + *std::get<0>(*data), func, - context); + std::get<1>(*data)); } } - }, begin, end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } } @@ -1513,8 +1528,8 @@ namespace EC */ template void forMatchingSignaturesPtr(FTuple fTuple, - void* context = nullptr, - std::size_t threadCount = 1) + void* userData = nullptr, + const bool useThreadPool = false) { std::vector > multiMatchingEntities( SigList::size); @@ -1529,7 +1544,7 @@ namespace EC }); // find and store entities matching signatures - if(threadCount <= 1) + if(!useThreadPool) { for(std::size_t eid = 0; eid < currentSize; ++eid) { @@ -1550,48 +1565,49 @@ namespace EC } else { - std::vector threads(threadCount); - std::mutex mutexes[SigList::size]; - std::size_t s = currentSize / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) - { + using TPFnDataType = std::tuple, std::vector >*, BitsetType*, std::mutex*>; + std::array fnDataAr; + + std::mutex mutex; + std::size_t s = currentSize / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) - { + if(i == ThreadCount - 1) { end = currentSize; - } - else - { + } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &mutexes, &multiMatchingEntities, &signatureBitsets] - (std::size_t begin, std::size_t end) - { - for(std::size_t j = begin; j < end; ++j) - { - if(!isAlive(j)) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = {begin, end}; + std::get<2>(fnDataAr.at(i)) = &multiMatchingEntities; + std::get<3>(fnDataAr.at(i)) = signatureBitsets; + std::get<4>(fnDataAr.at(i)) = &mutex; + + threadPool.queueFn([] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<1>(*data).at(0); + i < std::get<1>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { continue; } - for(std::size_t k = 0; k < SigList::size; ++k) - { - if((signatureBitsets[k] - & std::get(entities[j])) - == signatureBitsets[k]) - { - std::lock_guard guard( - mutexes[k]); - multiMatchingEntities[k].push_back(j); + for(std::size_t j = 0; j < SigList::size; ++j) { + if((std::get<3>(*data)[j] & std::get(std::get<0>(*data)->entities[i])) + == std::get<3>(*data)[j]) { + std::lock_guard lock(*std::get<4>(*data)); + std::get<2>(*data)->at(j).push_back(i); } } } - }, begin, end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } @@ -1599,7 +1615,7 @@ namespace EC EC::Meta::forEachDoubleTuple( EC::Meta::Morph >{}, fTuple, - [this, &multiMatchingEntities, &threadCount, &context] + [this, &multiMatchingEntities, useThreadPool, &userData] (auto sig, auto func, auto index) { using SignatureComponents = @@ -1609,55 +1625,56 @@ namespace EC EC::Meta::Morph< SignatureComponents, ForMatchingSignatureHelper<> >; - if(threadCount <= 1) + if(!useThreadPool) { for(const auto& id : multiMatchingEntities[index]) { if(isAlive(id)) { - Helper::callPtr(id, *this, func, context); + Helper::callPtr(id, *this, func, userData); } } } else { - std::vector threads(threadCount); + using TPFnType = std::tuple, std::vector > *, std::size_t>; + std::array fnDataAr; std::size_t s = multiMatchingEntities[index].size() - / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) - { + / ThreadCount; + for(unsigned int i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; std::size_t end; - if(i == threadCount - 1) - { + if(i == ThreadCount - 1) { end = multiMatchingEntities[index].size(); - } - else - { + } else { end = s * (i + 1); } - threads[i] = std::thread( - [this, &multiMatchingEntities, &index, &func, - &context] - (std::size_t begin, std::size_t end) - { - for(std::size_t j = begin; j < end; - ++j) - { - if(isAlive(multiMatchingEntities[index][j])) - { + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = userData; + std::get<2>(fnDataAr.at(i)) = {begin, end}; + std::get<3>(fnDataAr.at(i)) = &multiMatchingEntities; + std::get<4>(fnDataAr.at(i)) = index; + threadPool.queueFn([&func] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<2>(*data).at(0); + i < std::get<2>(*data).at(1); + ++i) { + if(std::get<0>(*data)->isAlive(std::get<3>(*data)->at(std::get<4>(*data)).at(i))) { Helper::callPtr( - multiMatchingEntities[index][j], - *this, + std::get<3>(*data)->at(std::get<4>(*data)).at(i), + *std::get<0>(*data), func, - context); + std::get<1>(*data)); } } - }, begin, end); + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) - { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } } @@ -1675,9 +1692,9 @@ namespace EC * query component/tag data. */ template - void forMatchingSimple(ForMatchingFn fn, void *userData = nullptr, std::size_t threadCount = 1) { + void forMatchingSimple(ForMatchingFn fn, void *userData = nullptr, const bool useThreadPool = false) { const BitsetType signatureBitset = BitsetType::template generateBitset(); - if(threadCount <= 1) { + if(!useThreadPool) { for(std::size_t i = 0; i < currentSize; ++i) { if(!std::get(entities[i])) { continue; @@ -1686,36 +1703,42 @@ namespace EC } } } else { - std::vector threads(threadCount); - const std::size_t s = currentSize / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) { - const std::size_t begin = s * i; - const std::size_t end = - i == threadCount - 1 ? - currentSize : - s * (i + 1); - threads[i] = std::thread( - [this] (const std::size_t begin, - const std::size_t end, - const BitsetType signatureBitset, - ForMatchingFn fn, - void *userData) { - for(std::size_t i = begin; i < end; ++i) { - if(!std::get(entities[i])) { - continue; - } else if((signatureBitset & std::get(entities[i])) == signatureBitset) { - fn(i, this, userData); - } + using TPFnDataType = std::tuple, void*>; + std::array fnDataAr; + + std::size_t s = currentSize / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { + std::size_t begin = s * i; + std::size_t end; + if(i == ThreadCount - 1) { + end = currentSize; + } else { + end = s * (i + 1); + } + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = &entities; + std::get<2>(fnDataAr.at(i)) = &signatureBitset; + std::get<3>(fnDataAr.at(i)) = {begin, end}; + std::get<4>(fnDataAr.at(i)) = userData; + threadPool.queueFn([&fn] (void *ud) { + auto *data = static_cast(ud); + for(std::size_t i = std::get<3>(*data).at(0); + i < std::get<3>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { + continue; + } else if((*std::get<2>(*data) & std::get(std::get<1>(*data)->at(i))) == *std::get<2>(*data)) { + fn(i, std::get<0>(*data), std::get<4>(*data)); } - }, - begin, - end, - signatureBitset, - fn, - userData); + } + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } } @@ -1730,8 +1753,8 @@ namespace EC * defined typedef of type ForMatchingFn. */ template - void forMatchingIterable(Iterable iterable, ForMatchingFn fn, void* userPtr = nullptr, std::size_t threadCount = 1) { - if(threadCount <= 1) { + void forMatchingIterable(Iterable iterable, ForMatchingFn fn, void* userData = nullptr, const bool useThreadPool = false) { + if(!useThreadPool) { bool isValid; for(std::size_t i = 0; i < currentSize; ++i) { if(!std::get(entities[i])) { @@ -1747,41 +1770,55 @@ namespace EC } if(!isValid) { continue; } - fn(i, this, userPtr); + fn(i, this, userData); } } else { - std::vector threads(threadCount); - std::size_t s = currentSize / threadCount; - for(std::size_t i = 0; i < threadCount; ++i) { + using TPFnDataType = std::tuple, void*>; + std::array fnDataAr; + + std::size_t s = currentSize / ThreadCount; + for(std::size_t i = 0; i < ThreadCount; ++i) { std::size_t begin = s * i; - std::size_t end = - i == threadCount - 1 ? - currentSize : - s * (i + 1); - threads[i] = std::thread( - [this, &fn, &iterable, userPtr] (std::size_t begin, std::size_t end) { - bool isValid; - for(std::size_t i = begin; i < end; ++i) { - if(!std::get(this->entities[i])) { - continue; - } - - isValid = true; - for(const auto& integralValue : iterable) { - if(!std::get(entities[i]).getCombinedBit(integralValue)) { - isValid = false; - break; - } - } - if(!isValid) { continue; } - - fn(i, this, userPtr); + std::size_t end; + if(i == ThreadCount - 1) { + end = currentSize; + } else { + end = s * (i + 1); + } + if(begin == end) { + continue; + } + std::get<0>(fnDataAr.at(i)) = this; + std::get<1>(fnDataAr.at(i)) = &entities; + std::get<2>(fnDataAr.at(i)) = &iterable; + std::get<3>(fnDataAr.at(i)) = {begin, end}; + std::get<4>(fnDataAr.at(i)) = userData; + threadPool.queueFn([&fn] (void *ud) { + auto *data = static_cast(ud); + bool isValid; + for(std::size_t i = std::get<3>(*data).at(0); + i < std::get<3>(*data).at(1); + ++i) { + if(!std::get<0>(*data)->isAlive(i)) { + continue; } - }, - begin, end); + isValid = true; + for(const auto& integralValue : *std::get<2>(*data)) { + if(!std::get(std::get<1>(*data)->at(i)).getCombinedBit(integralValue)) { + isValid = false; + break; + } + } + if(!isValid) { continue; } + + fn(i, std::get<0>(*data), std::get<4>(*data)); + + } + }, &fnDataAr.at(i)); } - for(std::size_t i = 0; i < threadCount; ++i) { - threads[i].join(); + threadPool.wakeThreads(); + while(!threadPool.isAllThreadsWaiting()) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } } diff --git a/src/test/ECTest.cpp b/src/test/ECTest.cpp index 24b8b11..3105903 100644 --- a/src/test/ECTest.cpp +++ b/src/test/ECTest.cpp @@ -464,7 +464,7 @@ TEST(EC, MultiThreaded) c->y = 2; }, nullptr, - 2 + true ); for(unsigned int i = 0; i < 17; ++i) @@ -490,7 +490,7 @@ TEST(EC, MultiThreaded) c->y = 4; }, nullptr, - 8 + true ); for(unsigned int i = 0; i < 3; ++i) @@ -516,7 +516,7 @@ TEST(EC, MultiThreaded) } ); - manager.callForMatchingFunctions(2); + manager.callForMatchingFunctions(true); for(unsigned int i = 0; i < 17; ++i) { @@ -531,7 +531,7 @@ TEST(EC, MultiThreaded) } ); - manager.callForMatchingFunction(f1, 4); + manager.callForMatchingFunction(f1, true); for(unsigned int i = 0; i < 17; ++i) { @@ -544,7 +544,7 @@ TEST(EC, MultiThreaded) manager.deleteEntity(i); } - manager.callForMatchingFunction(f0, 8); + manager.callForMatchingFunction(f0, true); for(unsigned int i = 0; i < 4; ++i) { @@ -710,7 +710,7 @@ TEST(EC, ForMatchingSignatures) c0->y = 2; }), nullptr, - 3 + true ); for(auto iter = cx.begin(); iter != cx.end(); ++iter) @@ -850,7 +850,7 @@ TEST(EC, forMatchingPtrs) &func0 ); manager.forMatchingSignaturePtr >( - &func1 + &func1, nullptr, true ); for(auto eid : e) @@ -1098,7 +1098,7 @@ TEST(EC, forMatchingSimple) { C0 *c0 = manager->getEntityData(id); c0->x += 10; c0->y += 10; - }, nullptr, 3); + }, nullptr, true); // verify { @@ -1296,7 +1296,7 @@ TEST(EC, forMatchingIterableFn) c->x += 100; c->y += 100; }; - manager.forMatchingIterable(iterable, fn, nullptr, 3); + manager.forMatchingIterable(iterable, fn, nullptr, true); } { @@ -1322,7 +1322,7 @@ TEST(EC, forMatchingIterableFn) c->x += 1000; c->y += 1000; }; - manager.forMatchingIterable(iterable, fn, nullptr, 3); + manager.forMatchingIterable(iterable, fn, nullptr, true); } { @@ -1365,7 +1365,7 @@ TEST(EC, MultiThreadedForMatching) { EXPECT_TRUE(manager.isAlive(first)); EXPECT_TRUE(manager.isAlive(second)); - manager.callForMatchingFunction(fnIdx, 2); + manager.callForMatchingFunction(fnIdx, true); EXPECT_TRUE(manager.isAlive(first)); EXPECT_FALSE(manager.isAlive(second));