]> git.seodisparate.com - EntityComponentMetaSystem/commitdiff
WIP convert Manager to use ThreadPool
authorStephen Seo <seo.disparate@gmail.com>
Mon, 6 Sep 2021 10:52:23 +0000 (19:52 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Mon, 6 Sep 2021 10:52:23 +0000 (19:52 +0900)
valgrind seems to report memory issues, and documentation may need more
updating.

src/EC/Manager.hpp
src/test/ECTest.cpp

index 8e4b08cea3d1dd2ad0318e599e76109982dd74a3..225ceb84750fd1383a6eab84fdf9c711f7a87f7d 100644 (file)
@@ -36,6 +36,8 @@
 #include "Meta/IndexOf.hpp"
 #include "Bitset.hpp"
 
+#include "ThreadPool.hpp"
+
 namespace EC
 {
     /*!
@@ -46,12 +48,20 @@ namespace EC
 
         Note that all components must have a default constructor.
 
+        An optional third template parameter may be given, which is the size of
+        the number of threads in the internal ThreadPool, and it must be at
+        least 2. If ThreadCount is 1 or less, the program will fail to compile.
+        Note that using the internal ThreadPool is optional; several member
+        functions of Manager accept a boolean indicating if the internal
+        ThreadPool is to be used. Always passing false for that value will
+        result in never using the ThreadPool.
+
         Example:
         \code{.cpp}
             EC::Manager<TypeList<C0, C1, C2>, TypeList<T0, T1>> manager;
         \endcode
     */
-    template <typename ComponentsList, typename TagsList>
+    template <typename ComponentsList, typename TagsList, unsigned int ThreadCount = 4>
     struct Manager
     {
     public:
@@ -83,6 +93,8 @@ namespace EC
         std::size_t currentSize = 0;
         std::unordered_set<std::size_t> deletedSet;
 
+        ThreadPool<ThreadCount> threadPool;
+
     public:
         /*!
             \brief Initializes the manager with a default capacity.
@@ -516,11 +528,11 @@ namespace EC
                 const std::size_t& entityID,
                 CType& ctype,
                 Function&& function,
-                void* context = nullptr)
+                void* userData = nullptr)
             {
                 function(
                     entityID,
-                    context,
+                    userData,
                     ctype.template getEntityData<Types>(entityID)...
                 );
             }
@@ -530,11 +542,11 @@ namespace EC
                 const std::size_t& entityID,
                 CType& ctype,
                 Function* function,
-                void* context = nullptr)
+                void* userData= nullptr)
             {
                 (*function)(
                     entityID,
-                    context,
+                    userData,
                     ctype.template getEntityData<Types>(entityID)...
                 );
             }
@@ -544,13 +556,13 @@ namespace EC
                 const std::size_t& entityID,
                 CType& ctype,
                 Function&& function,
-                void* context = nullptr) const
+                void* userData = nullptr) const
             {
                 ForMatchingSignatureHelper<Types...>::call(
                     entityID,
                     ctype,
                     std::forward<Function>(function),
-                    context);
+                    userData);
             }
 
             template <typename CType, typename Function>
@@ -558,13 +570,13 @@ namespace EC
                 const std::size_t& entityID,
                 CType& ctype,
                 Function* function,
-                void* context = nullptr) const
+                void* userData = nullptr) const
             {
                 ForMatchingSignatureHelper<Types...>::callPtr(
                     entityID,
                     ctype,
                     function,
-                    context);
+                    userData);
             }
         };
 
@@ -581,14 +593,15 @@ namespace EC
 
             The second parameter is default nullptr and will be passed to the
             function call as the second parameter as a means of providing
-            context (useful when the function is not a lambda function). The
-            third parameter is default 1 (not multi-threaded). If the third
-            parameter threadCount is set to a value greater than 1, then
-            threadCount threads will be used.  Note that multi-threading is
-            based on splitting the task of calling the function across sections
-            of entities. Thus if there are only a small amount of entities in
-            the manager, then using multiple threads may not have as great of a
-            speed-up.
+            context (useful when the function is not a lambda function).
+
+            The third parameter is default false (not multi-threaded).
+            Otherwise, if true, then the thread pool will be used to call the
+            given function in parallel across all entities. Note that
+            multi-threading is based on splitting the task of calling the
+            function across sections of entities. Thus if there are only a small
+            amount of entities in the manager, then using multiple threads may
+            not have as great of a speed-up.
 
             Example:
             \code{.cpp}
@@ -609,8 +622,8 @@ namespace EC
         */
         template <typename Signature, typename Function>
         void forMatchingSignature(Function&& function,
-            void* context = nullptr,
-            std::size_t threadCount = 1)
+            void* userData = nullptr,
+            const bool useThreadPool = false)
         {
             using SignatureComponents =
                 typename EC::Meta::Matching<Signature, ComponentsList>::type;
@@ -621,7 +634,7 @@ namespace EC
 
             BitsetType signatureBitset =
                 BitsetType::template generateBitset<Signature>();
-            if(threadCount <= 1)
+            if(!useThreadPool)
             {
                 for(std::size_t i = 0; i < currentSize; ++i)
                 {
@@ -634,52 +647,53 @@ namespace EC
                         == signatureBitset)
                     {
                         Helper::call(i, *this,
-                            std::forward<Function>(function), context);
+                            std::forward<Function>(function), userData);
                     }
                 }
             }
             else
             {
-                std::vector<std::thread> threads(threadCount);
-                std::size_t s = currentSize / threadCount;
-                for(std::size_t i = 0; i < threadCount; ++i)
-                {
+                using TPFnDataType = std::tuple<Manager*, EntitiesType*, BitsetType*, std::array<std::size_t, 2>, void*>;
+                std::array<TPFnDataType, ThreadCount> fnDataAr;
+
+                std::size_t s = currentSize / ThreadCount;
+                for(std::size_t i = 0; i < ThreadCount; ++i) {
                     std::size_t begin = s * i;
                     std::size_t end;
-                    if(i == threadCount - 1)
-                    {
+                    if(i == ThreadCount - 1) {
                         end = currentSize;
-                    }
-                    else
-                    {
+                    } else {
                         end = s * (i + 1);
                     }
-                    threads[i] = std::thread(
-                        [this, &function, &signatureBitset, &context]
-                            (std::size_t begin,
-                            std::size_t end) {
-                        for(std::size_t i = begin; i < end; ++i)
-                        {
-                            if(!std::get<bool>(this->entities[i]))
-                            {
+                    if(begin == end) {
+                        continue;
+                    }
+                    std::get<0>(fnDataAr.at(i)) = this;
+                    std::get<1>(fnDataAr.at(i)) = &entities;
+                    std::get<2>(fnDataAr.at(i)) = &signatureBitset;
+                    std::get<3>(fnDataAr.at(i)) = {begin, end};
+                    std::get<4>(fnDataAr.at(i)) = userData;
+                    threadPool.queueFn([&function] (void *ud) {
+                        auto *data = static_cast<TPFnDataType*>(ud);
+                        for(std::size_t i = std::get<3>(*data).at(0);
+                                i < std::get<3>(*data).at(1);
+                                ++i) {
+                            if(!std::get<0>(*data)->isAlive(i)) {
                                 continue;
                             }
 
-                            if((signatureBitset
-                                    & std::get<BitsetType>(entities[i]))
-                                == signatureBitset)
-                            {
-                                Helper::call(i, *this,
-                                    std::forward<Function>(function), context);
+                            if((*std::get<2>(*data)
+                                    & std::get<BitsetType>(
+                                        std::get<1>(*data)->at(i)))
+                                    == *std::get<2>(*data)) {
+                                Helper::call(i, *std::get<0>(*data), std::forward<Function>(function), std::get<4>(*data));
                             }
                         }
-                    },
-                        begin,
-                        end);
+                    }, &fnDataAr.at(i));
                 }
-                for(std::size_t i = 0; i < threadCount; ++i)
-                {
-                    threads[i].join();
+                threadPool.wakeThreads();
+                while(!threadPool.isAllThreadsWaiting()) {
+                    std::this_thread::sleep_for(std::chrono::milliseconds(5));
                 }
             }
         }
@@ -696,14 +710,15 @@ namespace EC
 
             The second parameter is default nullptr and will be passed to the
             function call as the second parameter as a means of providing
-            context (useful when the function is not a lambda function). The
-            third parameter is default 1 (not multi-threaded). If the third
-            parameter threadCount is set to a value greater than 1, then
-            threadCount threads will be used. Note that multi-threading is based
-            on splitting the task of calling the function across sections of
-            entities. Thus if there are only a small amount of entities in the
-            manager, then using multiple threads may not have as great of a
-            speed-up.
+            context (useful when the function is not a lambda function).
+
+            The third parameter is default false (not multi-threaded).
+            Otherwise, if true, then the thread pool will be used to call the
+            given function in parallel across all entities. Note that
+            multi-threading is based on splitting the task of calling the
+            function across sections of entities. Thus if there are only a small
+            amount of entities in the manager, then using multiple threads may
+            not have as great of a speed-up.
 
             Example:
             \code{.cpp}
@@ -726,8 +741,8 @@ namespace EC
         */
         template <typename Signature, typename Function>
         void forMatchingSignaturePtr(Function* function,
-            void* context = nullptr,
-            std::size_t threadCount = 1)
+            void* userData = nullptr,
+            const bool useThreadPool = false)
         {
             using SignatureComponents =
                 typename EC::Meta::Matching<Signature, ComponentsList>::type;
@@ -738,7 +753,7 @@ namespace EC
 
             BitsetType signatureBitset =
                 BitsetType::template generateBitset<Signature>();
-            if(threadCount <= 1)
+            if(!useThreadPool)
             {
                 for(std::size_t i = 0; i < currentSize; ++i)
                 {
@@ -750,51 +765,54 @@ namespace EC
                     if((signatureBitset & std::get<BitsetType>(entities[i]))
                         == signatureBitset)
                     {
-                        Helper::callPtr(i, *this, function, context);
+                        Helper::callPtr(i, *this, function, userData);
                     }
                 }
             }
             else
             {
-                std::vector<std::thread> threads(threadCount);
-                std::size_t s = currentSize / threadCount;
-                for(std::size_t i = 0; i < threadCount; ++i)
-                {
+                using TPFnDataType = std::tuple<Manager*, EntitiesType*, BitsetType*, std::array<std::size_t, 2>, void*, Function*>;
+                std::array<TPFnDataType, ThreadCount> fnDataAr;
+
+                std::size_t s = currentSize / ThreadCount;
+                for(std::size_t i = 0; i < ThreadCount; ++i) {
                     std::size_t begin = s * i;
                     std::size_t end;
-                    if(i == threadCount - 1)
-                    {
+                    if(i == ThreadCount - 1) {
                         end = currentSize;
-                    }
-                    else
-                    {
+                    } else {
                         end = s * (i + 1);
                     }
-                    threads[i] = std::thread(
-                        [this, &function, &signatureBitset, &context]
-                            (std::size_t begin,
-                            std::size_t end) {
-                        for(std::size_t i = begin; i < end; ++i)
-                        {
-                            if(!std::get<bool>(this->entities[i]))
-                            {
+                    if(begin == end) {
+                        continue;
+                    }
+                    std::get<0>(fnDataAr.at(i)) = this;
+                    std::get<1>(fnDataAr.at(i)) = &entities;
+                    std::get<2>(fnDataAr.at(i)) = &signatureBitset;
+                    std::get<3>(fnDataAr.at(i)) = {begin, end};
+                    std::get<4>(fnDataAr.at(i)) = userData;
+                    std::get<5>(fnDataAr.at(i)) = function;
+                    threadPool.queueFn([] (void *ud) {
+                        auto *data = static_cast<TPFnDataType*>(ud);
+                        for(std::size_t i = std::get<3>(*data).at(0);
+                                i < std::get<3>(*data).at(1);
+                                ++i) {
+                            if(!std::get<0>(*data)->isAlive(i)) {
                                 continue;
                             }
 
-                            if((signatureBitset
-                                    & std::get<BitsetType>(entities[i]))
-                                == signatureBitset)
-                            {
-                                Helper::callPtr(i, *this, function, context);
+                            if((*std::get<2>(*data)
+                                    & std::get<BitsetType>(
+                                        std::get<1>(*data)->at(i)))
+                                    == *std::get<2>(*data)) {
+                                Helper::callPtr(i, *std::get<0>(*data), std::get<5>(*data), std::get<4>(*data));
                             }
                         }
-                    },
-                        begin,
-                        end);
+                    }, &fnDataAr.at(i));
                 }
-                for(std::size_t i = 0; i < threadCount; ++i)
-                {
-                    threads[i].join();
+                threadPool.wakeThreads();
+                while(!threadPool.isAllThreadsWaiting()) {
+                    std::this_thread::sleep_for(std::chrono::milliseconds(5));
                 }
             }
         }
@@ -859,7 +877,7 @@ namespace EC
         template <typename Signature, typename Function>
         std::size_t addForMatchingFunction(
             Function&& function,
-            void* context = nullptr)
+            void* userData = nullptr)
         {
             while(forMatchingFunctions.find(functionIndex)
                 != forMatchingFunctions.end())
@@ -882,54 +900,63 @@ namespace EC
                 functionIndex,
                 std::make_tuple(
                     signatureBitset,
-                    context,
+                    userData,
                     [function, helper, this]
-                        (std::size_t threadCount,
+                        (const bool useThreadPool,
                         std::vector<std::size_t> matching,
-                        void* context)
+                        void* userData)
                 {
-                    if(threadCount <= 1 || matching.size() < threadCount)
+                    if(!useThreadPool)
                     {
                         for(auto eid : matching)
                         {
                             if(isAlive(eid))
                             {
                                 helper.callInstancePtr(
-                                    eid, *this, &function, context);
+                                    eid, *this, &function, userData);
                             }
                         }
                     }
                     else
                     {
-                        std::vector<std::thread> threads(threadCount);
-                        std::size_t s = matching.size() / threadCount;
-                        for(std::size_t i = 0; i < threadCount; ++ i)
-                        {
+                        using TPFnDataType = std::tuple<Manager*, EntitiesType*, std::array<std::size_t, 2>, void*, const std::vector<std::size_t>*>;
+                        std::array<TPFnDataType, ThreadCount> fnDataAr;
+
+                        std::size_t s = matching.size() / ThreadCount;
+                        for(std::size_t i = 0; i < ThreadCount; ++i) {
                             std::size_t begin = s * i;
                             std::size_t end;
-                            if(i == threadCount - 1) {
+                            if(i == ThreadCount - 1) {
                                 end = matching.size();
                             } else {
                                 end = s * (i + 1);
                             }
-                            threads[i] = std::thread(
-                                [this, &function, &helper, &context, &matching]
-                                    (std::size_t begin,
-                                    std::size_t end) {
-                                for(std::size_t j = begin; j < end; ++j)
-                                {
-                                    if(isAlive(matching[j]))
-                                    {
+                            if(begin == end) {
+                                continue;
+                            }
+                            std::get<0>(fnDataAr.at(i)) = this;
+                            std::get<1>(fnDataAr.at(i)) = &entities;
+                            std::get<2>(fnDataAr.at(i)) = {begin, end};
+                            std::get<3>(fnDataAr.at(i)) = userData;
+                            std::get<4>(fnDataAr.at(i)) = &matching;
+                            threadPool.queueFn([function, helper] (void* ud) {
+                                auto *data = static_cast<TPFnDataType*>(ud);
+                                for(std::size_t i = std::get<2>(*data).at(0);
+                                        i < std::get<2>(*data).at(1);
+                                        ++i) {
+                                    if(std::get<0>(*data)->isAlive(std::get<4>(*data)->at(i))) {
                                         helper.callInstancePtr(
-                                            matching[j], *this, &function, context);
+                                            std::get<4>(*data)->at(i),
+                                            *std::get<0>(*data),
+                                            &function,
+                                            std::get<3>(*data));
                                     }
                                 }
-                            },
-                            begin, end);
+                            }, &fnDataAr.at(i));
                         }
-                        for(std::size_t i = 0; i < threadCount; ++i)
-                        {
-                            threads[i].join();
+                        threadPool.wakeThreads();
+                        while(!threadPool.isAllThreadsWaiting()) {
+                            std::this_thread::sleep_for(std::chrono::milliseconds(5));
                         }
                     }
                 })));
@@ -939,11 +966,11 @@ namespace EC
 
     private:
         std::vector<std::vector<std::size_t> > getMatchingEntities(
-            std::vector<BitsetType*> bitsets, std::size_t threadCount = 1)
+            std::vector<BitsetType*> bitsets, const bool useThreadPool = false)
         {
             std::vector<std::vector<std::size_t> > matchingV(bitsets.size());
 
-            if(threadCount <= 1 || currentSize <= threadCount)
+            if(!useThreadPool)
             {
                 for(std::size_t i = 0; i < currentSize; ++i)
                 {
@@ -963,63 +990,52 @@ namespace EC
             }
             else
             {
-                std::vector<std::thread> threads(threadCount);
-                std::size_t s = currentSize / threadCount;
-                std::mutex mutex;
+                using TPFnDataType = std::tuple<Manager*, std::array<std::size_t, 2>, std::vector<std::vector<std::size_t> >*, const std::vector<BitsetType*>*, EntitiesType*, std::mutex*>;
+                std::array<TPFnDataType, ThreadCount> fnDataAr;
 
-                if(s == 0) {
-                    for(std::size_t i = 0; i < currentSize; ++i) {
-                        threads[i] = std::thread(
-                            [this, &matchingV, &bitsets, &mutex] (std::size_t idx) {
-                            if(!isAlive(idx)) {
-                                return;
+                std::size_t s = currentSize / ThreadCount;
+                std::mutex mutex;
+                for(std::size_t i = 0; i < ThreadCount; ++i) {
+                    std::size_t begin = s * i;
+                    std::size_t end;
+                    if(i == ThreadCount - 1) {
+                        end = currentSize;
+                    } else {
+                        end = s * (i + 1);
+                    }
+                    if(begin == end) {
+                        continue;
+                    }
+                    std::get<0>(fnDataAr.at(i)) = this;
+                    std::get<1>(fnDataAr.at(i)) = {begin, end};
+                    std::get<2>(fnDataAr.at(i)) = &matchingV;
+                    std::get<3>(fnDataAr.at(i)) = &bitsets;
+                    std::get<4>(fnDataAr.at(i)) = &entities;
+                    std::get<5>(fnDataAr.at(i)) = &mutex;
+                    threadPool.queueFn([] (void *ud) {
+                        auto *data = static_cast<TPFnDataType*>(ud);
+                        for(std::size_t i = std::get<1>(*data).at(0);
+                                i < std::get<1>(*data).at(1);
+                                ++i) {
+                            if(!std::get<0>(*data)->isAlive(i)) {
+                                continue;
                             }
-                            for(std::size_t k = 0; k < bitsets.size(); ++k)
-                            {
-                                if(((*bitsets[k]) &
-                                    std::get<BitsetType>(entities[idx]))
-                                   == (*bitsets[k]))
-                                {
-                                    std::lock_guard<std::mutex> guard(mutex);
-                                    matchingV[k].push_back(idx);
+                            for(std::size_t j = 0;
+                                    j < std::get<3>(*data)->size();
+                                    ++j) {
+                                if(((*std::get<3>(*data)->at(j))
+                                            & std::get<BitsetType>(std::get<4>(*data)->at(i)))
+                                        == (*std::get<3>(*data)->at(j))) {
+                                    std::lock_guard<std::mutex> lock(*std::get<5>(*data));
+                                    std::get<2>(*data)->at(j).push_back(i);
                                 }
                             }
-                        }, i);
-                    }
-                    for(std::size_t i = 0; i < currentSize; ++i) {
-                        threads[i].join();
-                    }
-                } else {
-                    for (std::size_t i = 0; i < threadCount; ++i) {
-                        std::size_t begin = s * i;
-                        std::size_t end;
-                        if (i == threadCount - 1) {
-                            end = currentSize;
-                        } else {
-                            end = s * (i + 1);
                         }
-
-                        threads[i] = std::thread(
-                                [this, &matchingV, &bitsets, &mutex]
-                                        (std::size_t begin, std::size_t end) {
-                                    for (std::size_t j = begin; j < end; ++j) {
-                                        if (!isAlive(j)) {
-                                            continue;
-                                        }
-                                        for (std::size_t k = 0; k < bitsets.size(); ++k) {
-                                            if (((*bitsets[k]) &
-                                                 std::get<BitsetType>(entities[j]))
-                                                == (*bitsets[k])) {
-                                                std::lock_guard<std::mutex> guard(mutex);
-                                                matchingV[k].push_back(j);
-                                            }
-                                        }
-                                    }
-                                }, begin, end);
-                    }
-                    for (std::size_t i = 0; i < threadCount; ++i) {
-                        threads[i].join();
-                    }
+                    }, &fnDataAr.at(i));
+                }
+                threadPool.wakeThreads();
+                while(!threadPool.isAllThreadsWaiting()) {
+                    std::this_thread::sleep_for(std::chrono::milliseconds(5));
                 }
             }
 
@@ -1031,12 +1047,13 @@ namespace EC
         /*!
             \brief Call all stored functions.
 
-            The first (and only) parameter can be optionally used to specify the
-            number of threads to use when calling the functions. Otherwise, this
-            function is by default not multi-threaded.
-            Note that multi-threading is based on splitting the task of calling
-            the functions across sections of entities. Thus if there are only
-            a small amount of entities in the manager, then using multiple
+            The first (and only) parameter can be optionally used to enable the
+            use of the internal ThreadPool to call all stored functions in
+            parallel. Using the value false (which is the default) will not use
+            the ThreadPool and run all stored functions sequentially on the main
+            thread.  Note that multi-threading is based on splitting the task of
+            calling the functions across sections of entities. Thus if there are
+            only a small amount of entities in the manager, then using multiple
             threads may not have as great of a speed-up.
 
             Example:
@@ -1058,7 +1075,7 @@ namespace EC
                 manager.clearForMatchingFunctions();
             \endcode
         */
-        void callForMatchingFunctions(std::size_t threadCount = 1)
+        void callForMatchingFunctions(const bool useThreadPool = false)
         {
             std::vector<BitsetType*> bitsets;
             for(auto iter = forMatchingFunctions.begin();
@@ -1069,7 +1086,7 @@ namespace EC
             }
 
             std::vector<std::vector<std::size_t> > matching =
-                getMatchingEntities(bitsets, threadCount);
+                getMatchingEntities(bitsets, useThreadPool);
 
             std::size_t i = 0;
             for(auto iter = forMatchingFunctions.begin();
@@ -1077,20 +1094,21 @@ namespace EC
                 ++iter)
             {
                 std::get<2>(iter->second)(
-                    threadCount, matching[i++], std::get<1>(iter->second));
+                    useThreadPool, matching[i++], std::get<1>(iter->second));
             }
         }
 
         /*!
             \brief Call a specific stored function.
 
-            A second parameter can be optionally used to specify the number
-            of threads to use when calling the function. Otherwise, this
-            function is by default not multi-threaded.
-            Note that multi-threading is based on splitting the task of calling
-            the function across sections of entities. Thus if there are only
-            a small amount of entities in the manager, then using multiple
-            threads may not have as great of a speed-up.
+            The second parameter can be optionally used to enable the use of the
+            internal ThreadPool to call the stored function in parallel. Using
+            the value false (which is the default) will not use the ThreadPool
+            and run the stored function sequentially on the main thread.  Note
+            that multi-threading is based on splitting the task of calling the
+            functions across sections of entities. Thus if there are only a
+            small amount of entities in the manager, then using multiple threads
+            may not have as great of a speed-up.
 
             Example:
             \code{.cpp}
@@ -1110,7 +1128,7 @@ namespace EC
             \return False if a function with the given id does not exist.
         */
         bool callForMatchingFunction(std::size_t id,
-            std::size_t threadCount = 1)
+                                     const bool useThreadPool = false)
         {
             auto iter = forMatchingFunctions.find(id);
             if(iter == forMatchingFunctions.end())
@@ -1119,9 +1137,9 @@ namespace EC
             }
             std::vector<std::vector<std::size_t> > matching =
                 getMatchingEntities(std::vector<BitsetType*>{
-                    &std::get<BitsetType>(iter->second)}, threadCount);
+                    &std::get<BitsetType>(iter->second)}, useThreadPool);
             std::get<2>(iter->second)(
-                threadCount, matching[0], std::get<1>(iter->second));
+                useThreadPool, matching[0], std::get<1>(iter->second));
             return true;
         }
 
@@ -1263,12 +1281,12 @@ namespace EC
 
             \return True if id is valid and context was updated
         */
-        bool changeForMatchingFunctionContext(std::size_t id, void* context)
+        bool changeForMatchingFunctionContext(std::size_t id, void* userData)
         {
             auto f = forMatchingFunctions.find(id);
             if(f != forMatchingFunctions.end())
             {
-                std::get<1>(f->second) = context;
+                std::get<1>(f->second) = userData;
                 return true;
             }
             return false;
@@ -1317,11 +1335,11 @@ namespace EC
         template <typename SigList, typename FTuple>
         void forMatchingSignatures(
             FTuple fTuple,
-            void* context = nullptr,
-            const std::size_t threadCount = 1)
+            void* userData = nullptr,
+            const bool useThreadPool = false)
         {
-            std::vector<std::vector<std::size_t> > multiMatchingEntities(
-                SigList::size);
+            std::vector<std::vector<std::size_t> >
+                multiMatchingEntities(SigList::size);
             BitsetType signatureBitsets[SigList::size];
 
             // generate bitsets for each signature
@@ -1333,7 +1351,7 @@ namespace EC
             });
 
             // find and store entities matching signatures
-            if(threadCount <= 1)
+            if(!useThreadPool)
             {
                 for(std::size_t eid = 0; eid < currentSize; ++eid)
                 {
@@ -1354,48 +1372,49 @@ namespace EC
             }
             else
             {
-                std::vector<std::thread> threads(threadCount);
-                std::mutex mutexes[SigList::size];
-                std::size_t s = currentSize / threadCount;
-                for(std::size_t i = 0; i < threadCount; ++i)
-                {
+                using TPFnDataType = std::tuple<Manager*, std::array<std::size_t, 2>, std::vector<std::vector<std::size_t> >*, BitsetType*, std::mutex*>;
+                std::array<TPFnDataType, ThreadCount> fnDataAr;
+
+                std::mutex mutex;
+                std::size_t s = currentSize / ThreadCount;
+                for(std::size_t i = 0; i < ThreadCount; ++i) {
                     std::size_t begin = s * i;
                     std::size_t end;
-                    if(i == threadCount - 1)
-                    {
+                    if(i == ThreadCount - 1) {
                         end = currentSize;
-                    }
-                    else
-                    {
+                    } else {
                         end = s * (i + 1);
                     }
-                    threads[i] = std::thread(
-                    [this, &mutexes, &multiMatchingEntities, &signatureBitsets]
-                    (std::size_t begin, std::size_t end)
-                    {
-                        for(std::size_t j = begin; j < end; ++j)
-                        {
-                            if(!isAlive(j))
-                            {
+                    if(begin == end) {
+                        continue;
+                    }
+                    std::get<0>(fnDataAr.at(i)) = this;
+                    std::get<1>(fnDataAr.at(i)) = {begin, end};
+                    std::get<2>(fnDataAr.at(i)) = &multiMatchingEntities;
+                    std::get<3>(fnDataAr.at(i)) = signatureBitsets;
+                    std::get<4>(fnDataAr.at(i)) = &mutex;
+
+                    threadPool.queueFn([] (void *ud) {
+                        auto *data = static_cast<TPFnDataType*>(ud);
+                        for(std::size_t i = std::get<1>(*data).at(0);
+                                i < std::get<1>(*data).at(1);
+                                ++i) {
+                            if(!std::get<0>(*data)->isAlive(i)) {
                                 continue;
                             }
-                            for(std::size_t k = 0; k < SigList::size; ++k)
-                            {
-                                if((signatureBitsets[k]
-                                    & std::get<BitsetType>(entities[j]))
-                                        == signatureBitsets[k])
-                                {
-                                    std::lock_guard<std::mutex> guard(
-                                        mutexes[k]);
-                                    multiMatchingEntities[k].push_back(j);
+                            for(std::size_t j = 0; j < SigList::size; ++j) {
+                                if((std::get<3>(*data)[j] & std::get<BitsetType>(std::get<0>(*data)->entities[i]))
+                                        == std::get<3>(*data)[j]) {
+                                    std::lock_guard<std::mutex> lock(*std::get<4>(*data));
+                                    std::get<2>(*data)->at(j).push_back(i);
                                 }
                             }
                         }
-                    }, begin, end);
+                    }, &fnDataAr.at(i));
                 }
-                for(std::size_t i = 0; i < threadCount; ++i)
-                {
-                    threads[i].join();
+                threadPool.wakeThreads();
+                while(!threadPool.isAllThreadsWaiting()) {
+                    std::this_thread::sleep_for(std::chrono::milliseconds(5));
                 }
             }
 
@@ -1403,7 +1422,7 @@ namespace EC
             EC::Meta::forEachDoubleTuple(
                 EC::Meta::Morph<SigList, std::tuple<> >{},
                 fTuple,
-                [this, &multiMatchingEntities, &threadCount, &context]
+                [this, &multiMatchingEntities, useThreadPool, &userData]
                 (auto sig, auto func, auto index)
                 {
                     using SignatureComponents =
@@ -1413,55 +1432,51 @@ namespace EC
                         EC::Meta::Morph<
                             SignatureComponents,
                             ForMatchingSignatureHelper<> >;
-                    if(threadCount <= 1)
-                    {
-                        for(const auto& id : multiMatchingEntities[index])
-                        {
-                            if(isAlive(id))
-                            {
-                                Helper::call(id, *this, func, context);
+                    if(!useThreadPool) {
+                        for(const auto& id : multiMatchingEntities[index]) {
+                            if(isAlive(id)) {
+                                Helper::call(id, *this, func, userData);
                             }
                         }
-                    }
-                    else
-                    {
-                        std::vector<std::thread> threads(threadCount);
+                    } else {
+                        using TPFnType = std::tuple<Manager*, void*, std::array<std::size_t, 2>, std::vector<std::vector<std::size_t> > *, std::size_t>;
+                        std::array<TPFnType, ThreadCount> fnDataAr;
                         std::size_t s = multiMatchingEntities[index].size()
-                            / threadCount;
-                        for(std::size_t i = 0; i < threadCount; ++i)
-                        {
+                            / ThreadCount;
+                        for(unsigned int i = 0; i < ThreadCount; ++i) {
                             std::size_t begin = s * i;
                             std::size_t end;
-                            if(i == threadCount - 1)
-                            {
+                            if(i == ThreadCount - 1) {
                                 end = multiMatchingEntities[index].size();
-                            }
-                            else
-                            {
+                            } else {
                                 end = s * (i + 1);
                             }
-                            threads[i] = std::thread(
-                            [this, &multiMatchingEntities, &index, &func,
-                                &context]
-                            (std::size_t begin, std::size_t end)
-                            {
-                                for(std::size_t j = begin; j < end;
-                                    ++j)
-                                {
-                                    if(isAlive(multiMatchingEntities[index][j]))
-                                    {
+                            if(begin == end) {
+                                continue;
+                            }
+                            std::get<0>(fnDataAr.at(i)) = this;
+                            std::get<1>(fnDataAr.at(i)) = userData;
+                            std::get<2>(fnDataAr.at(i)) = {begin, end};
+                            std::get<3>(fnDataAr.at(i)) = &multiMatchingEntities;
+                            std::get<4>(fnDataAr.at(i)) = index;
+                            threadPool.queueFn([&func] (void *ud) {
+                                auto *data = static_cast<TPFnType*>(ud);
+                                for(std::size_t i = std::get<2>(*data).at(0);
+                                        i < std::get<2>(*data).at(1);
+                                        ++i) {
+                                    if(std::get<0>(*data)->isAlive(std::get<3>(*data)->at(std::get<4>(*data)).at(i))) {
                                         Helper::call(
-                                            multiMatchingEntities[index][j],
-                                            *this,
+                                            std::get<3>(*data)->at(std::get<4>(*data)).at(i),
+                                            *std::get<0>(*data),
                                             func,
-                                            context);
+                                            std::get<1>(*data));
                                     }
                                 }
-                            }, begin, end);
+                            }, &fnDataAr.at(i));
                         }
-                        for(std::size_t i = 0; i < threadCount; ++i)
-                        {
-                            threads[i].join();
+                        threadPool.wakeThreads();
+                        while(!threadPool.isAllThreadsWaiting()) {
+                            std::this_thread::sleep_for(std::chrono::milliseconds(5));
                         }
                     }
                 }
@@ -1513,8 +1528,8 @@ namespace EC
         */
         template <typename SigList, typename FTuple>
         void forMatchingSignaturesPtr(FTuple fTuple,
-            void* context = nullptr,
-            std::size_t threadCount = 1)
+            void* userData = nullptr,
+            const bool useThreadPool = false)
         {
             std::vector<std::vector<std::size_t> > multiMatchingEntities(
                 SigList::size);
@@ -1529,7 +1544,7 @@ namespace EC
             });
 
             // find and store entities matching signatures
-            if(threadCount <= 1)
+            if(!useThreadPool)
             {
                 for(std::size_t eid = 0; eid < currentSize; ++eid)
                 {
@@ -1550,48 +1565,49 @@ namespace EC
             }
             else
             {
-                std::vector<std::thread> threads(threadCount);
-                std::mutex mutexes[SigList::size];
-                std::size_t s = currentSize / threadCount;
-                for(std::size_t i = 0; i < threadCount; ++i)
-                {
+                using TPFnDataType = std::tuple<Manager*, std::array<std::size_t, 2>, std::vector<std::vector<std::size_t> >*, BitsetType*, std::mutex*>;
+                std::array<TPFnDataType, ThreadCount> fnDataAr;
+
+                std::mutex mutex;
+                std::size_t s = currentSize / ThreadCount;
+                for(std::size_t i = 0; i < ThreadCount; ++i) {
                     std::size_t begin = s * i;
                     std::size_t end;
-                    if(i == threadCount - 1)
-                    {
+                    if(i == ThreadCount - 1) {
                         end = currentSize;
-                    }
-                    else
-                    {
+                    } else {
                         end = s * (i + 1);
                     }
-                    threads[i] = std::thread(
-                    [this, &mutexes, &multiMatchingEntities, &signatureBitsets]
-                    (std::size_t begin, std::size_t end)
-                    {
-                        for(std::size_t j = begin; j < end; ++j)
-                        {
-                            if(!isAlive(j))
-                            {
+                    if(begin == end) {
+                        continue;
+                    }
+                    std::get<0>(fnDataAr.at(i)) = this;
+                    std::get<1>(fnDataAr.at(i)) = {begin, end};
+                    std::get<2>(fnDataAr.at(i)) = &multiMatchingEntities;
+                    std::get<3>(fnDataAr.at(i)) = signatureBitsets;
+                    std::get<4>(fnDataAr.at(i)) = &mutex;
+
+                    threadPool.queueFn([] (void *ud) {
+                        auto *data = static_cast<TPFnDataType*>(ud);
+                        for(std::size_t i = std::get<1>(*data).at(0);
+                                i < std::get<1>(*data).at(1);
+                                ++i) {
+                            if(!std::get<0>(*data)->isAlive(i)) {
                                 continue;
                             }
-                            for(std::size_t k = 0; k < SigList::size; ++k)
-                            {
-                                if((signatureBitsets[k]
-                                    & std::get<BitsetType>(entities[j]))
-                                        == signatureBitsets[k])
-                                {
-                                    std::lock_guard<std::mutex> guard(
-                                        mutexes[k]);
-                                    multiMatchingEntities[k].push_back(j);
+                            for(std::size_t j = 0; j < SigList::size; ++j) {
+                                if((std::get<3>(*data)[j] & std::get<BitsetType>(std::get<0>(*data)->entities[i]))
+                                        == std::get<3>(*data)[j]) {
+                                    std::lock_guard<std::mutex> lock(*std::get<4>(*data));
+                                    std::get<2>(*data)->at(j).push_back(i);
                                 }
                             }
                         }
-                    }, begin, end);
+                    }, &fnDataAr.at(i));
                 }
-                for(std::size_t i = 0; i < threadCount; ++i)
-                {
-                    threads[i].join();
+                threadPool.wakeThreads();
+                while(!threadPool.isAllThreadsWaiting()) {
+                    std::this_thread::sleep_for(std::chrono::milliseconds(5));
                 }
             }
 
@@ -1599,7 +1615,7 @@ namespace EC
             EC::Meta::forEachDoubleTuple(
                 EC::Meta::Morph<SigList, std::tuple<> >{},
                 fTuple,
-                [this, &multiMatchingEntities, &threadCount, &context]
+                [this, &multiMatchingEntities, useThreadPool, &userData]
                 (auto sig, auto func, auto index)
                 {
                     using SignatureComponents =
@@ -1609,55 +1625,56 @@ namespace EC
                         EC::Meta::Morph<
                             SignatureComponents,
                             ForMatchingSignatureHelper<> >;
-                    if(threadCount <= 1)
+                    if(!useThreadPool)
                     {
                         for(const auto& id : multiMatchingEntities[index])
                         {
                             if(isAlive(id))
                             {
-                                Helper::callPtr(id, *this, func, context);
+                                Helper::callPtr(id, *this, func, userData);
                             }
                         }
                     }
                     else
                     {
-                        std::vector<std::thread> threads(threadCount);
+                        using TPFnType = std::tuple<Manager*, void*, std::array<std::size_t, 2>, std::vector<std::vector<std::size_t> > *, std::size_t>;
+                        std::array<TPFnType, ThreadCount> fnDataAr;
                         std::size_t s = multiMatchingEntities[index].size()
-                            / threadCount;
-                        for(std::size_t i = 0; i < threadCount; ++i)
-                        {
+                            / ThreadCount;
+                        for(unsigned int i = 0; i < ThreadCount; ++i) {
                             std::size_t begin = s * i;
                             std::size_t end;
-                            if(i == threadCount - 1)
-                            {
+                            if(i == ThreadCount - 1) {
                                 end = multiMatchingEntities[index].size();
-                            }
-                            else
-                            {
+                            } else {
                                 end = s * (i + 1);
                             }
-                            threads[i] = std::thread(
-                            [this, &multiMatchingEntities, &index, &func,
-                                &context]
-                            (std::size_t begin, std::size_t end)
-                            {
-                                for(std::size_t j = begin; j < end;
-                                    ++j)
-                                {
-                                    if(isAlive(multiMatchingEntities[index][j]))
-                                    {
+                            if(begin == end) {
+                                continue;
+                            }
+                            std::get<0>(fnDataAr.at(i)) = this;
+                            std::get<1>(fnDataAr.at(i)) = userData;
+                            std::get<2>(fnDataAr.at(i)) = {begin, end};
+                            std::get<3>(fnDataAr.at(i)) = &multiMatchingEntities;
+                            std::get<4>(fnDataAr.at(i)) = index;
+                            threadPool.queueFn([&func] (void *ud) {
+                                auto *data = static_cast<TPFnType*>(ud);
+                                for(std::size_t i = std::get<2>(*data).at(0);
+                                        i < std::get<2>(*data).at(1);
+                                        ++i) {
+                                    if(std::get<0>(*data)->isAlive(std::get<3>(*data)->at(std::get<4>(*data)).at(i))) {
                                         Helper::callPtr(
-                                            multiMatchingEntities[index][j],
-                                            *this,
+                                            std::get<3>(*data)->at(std::get<4>(*data)).at(i),
+                                            *std::get<0>(*data),
                                             func,
-                                            context);
+                                            std::get<1>(*data));
                                     }
                                 }
-                            }, begin, end);
+                            }, &fnDataAr.at(i));
                         }
-                        for(std::size_t i = 0; i < threadCount; ++i)
-                        {
-                            threads[i].join();
+                        threadPool.wakeThreads();
+                        while(!threadPool.isAllThreadsWaiting()) {
+                            std::this_thread::sleep_for(std::chrono::milliseconds(5));
                         }
                     }
                 }
@@ -1675,9 +1692,9 @@ namespace EC
          * query component/tag data.
          */
         template <typename Signature>
-        void forMatchingSimple(ForMatchingFn fn, void *userData = nullptr, std::size_t threadCount = 1) {
+        void forMatchingSimple(ForMatchingFn fn, void *userData = nullptr, const bool useThreadPool = false) {
             const BitsetType signatureBitset = BitsetType::template generateBitset<Signature>();
-            if(threadCount <= 1) {
+            if(!useThreadPool) {
                 for(std::size_t i = 0; i < currentSize; ++i) {
                     if(!std::get<bool>(entities[i])) {
                         continue;
@@ -1686,36 +1703,42 @@ namespace EC
                     }
                 }
             } else {
-                std::vector<std::thread> threads(threadCount);
-                const std::size_t s = currentSize / threadCount;
-                for(std::size_t i = 0; i < threadCount; ++i) {
-                    const std::size_t begin = s * i;
-                    const std::size_t end =
-                        i == threadCount - 1 ?
-                            currentSize :
-                            s * (i + 1);
-                    threads[i] = std::thread(
-                        [this] (const std::size_t begin,
-                                const std::size_t end,
-                                const BitsetType signatureBitset,
-                                ForMatchingFn fn,
-                                void *userData) {
-                            for(std::size_t i = begin; i < end; ++i) {
-                                if(!std::get<bool>(entities[i])) {
-                                    continue;
-                                } else if((signatureBitset & std::get<BitsetType>(entities[i])) == signatureBitset) {
-                                    fn(i, this, userData);
-                                }
+                using TPFnDataType = std::tuple<Manager*, EntitiesType*, const BitsetType*, std::array<std::size_t, 2>, void*>;
+                std::array<TPFnDataType, ThreadCount> fnDataAr;
+
+                std::size_t s = currentSize / ThreadCount;
+                for(std::size_t i = 0; i < ThreadCount; ++i) {
+                    std::size_t begin = s * i;
+                    std::size_t end;
+                    if(i == ThreadCount - 1) {
+                        end = currentSize;
+                    } else {
+                        end = s * (i + 1);
+                    }
+                    if(begin == end) {
+                        continue;
+                    }
+                    std::get<0>(fnDataAr.at(i)) = this;
+                    std::get<1>(fnDataAr.at(i)) = &entities;
+                    std::get<2>(fnDataAr.at(i)) = &signatureBitset;
+                    std::get<3>(fnDataAr.at(i)) = {begin, end};
+                    std::get<4>(fnDataAr.at(i)) = userData;
+                    threadPool.queueFn([&fn] (void *ud) {
+                        auto *data = static_cast<TPFnDataType*>(ud);
+                        for(std::size_t i = std::get<3>(*data).at(0);
+                                i < std::get<3>(*data).at(1);
+                                ++i) {
+                            if(!std::get<0>(*data)->isAlive(i)) {
+                                continue;
+                            } else if((*std::get<2>(*data) & std::get<BitsetType>(std::get<1>(*data)->at(i))) == *std::get<2>(*data)) {
+                                fn(i, std::get<0>(*data), std::get<4>(*data));
                             }
-                        },
-                        begin,
-                        end,
-                        signatureBitset,
-                        fn,
-                        userData);
+                        }
+                    }, &fnDataAr.at(i));
                 }
-                for(std::size_t i = 0; i < threadCount; ++i) {
-                    threads[i].join();
+                threadPool.wakeThreads();
+                while(!threadPool.isAllThreadsWaiting()) {
+                    std::this_thread::sleep_for(std::chrono::milliseconds(5));
                 }
             }
         }
@@ -1730,8 +1753,8 @@ namespace EC
          * defined typedef of type ForMatchingFn.
          */
         template <typename Iterable>
-        void forMatchingIterable(Iterable iterable, ForMatchingFn fn, void* userPtr = nullptr, std::size_t threadCount = 1) {
-            if(threadCount <= 1) {
+        void forMatchingIterable(Iterable iterable, ForMatchingFn fn, void* userData = nullptr, const bool useThreadPool = false) {
+            if(!useThreadPool) {
                 bool isValid;
                 for(std::size_t i = 0; i < currentSize; ++i) {
                     if(!std::get<bool>(entities[i])) {
@@ -1747,41 +1770,55 @@ namespace EC
                     }
                     if(!isValid) { continue; }
 
-                    fn(i, this, userPtr);
+                    fn(i, this, userData);
                 }
             } else {
-                std::vector<std::thread> threads(threadCount);
-                std::size_t s = currentSize / threadCount;
-                for(std::size_t i = 0; i < threadCount; ++i) {
+                using TPFnDataType = std::tuple<Manager*, EntitiesType*, Iterable*, std::array<std::size_t, 2>, void*>;
+                std::array<TPFnDataType, ThreadCount> fnDataAr;
+
+                std::size_t s = currentSize / ThreadCount;
+                for(std::size_t i = 0; i < ThreadCount; ++i) {
                     std::size_t begin = s * i;
-                    std::size_t end =
-                        i == threadCount - 1 ?
-                            currentSize :
-                            s * (i + 1);
-                    threads[i] = std::thread(
-                        [this, &fn, &iterable, userPtr] (std::size_t begin, std::size_t end) {
-                            bool isValid;
-                            for(std::size_t i = begin; i < end; ++i) {
-                                if(!std::get<bool>(this->entities[i])) {
-                                    continue;
+                    std::size_t end;
+                    if(i == ThreadCount - 1) {
+                        end = currentSize;
+                    } else {
+                        end = s * (i + 1);
+                    }
+                    if(begin == end) {
+                        continue;
+                    }
+                    std::get<0>(fnDataAr.at(i)) = this;
+                    std::get<1>(fnDataAr.at(i)) = &entities;
+                    std::get<2>(fnDataAr.at(i)) = &iterable;
+                    std::get<3>(fnDataAr.at(i)) = {begin, end};
+                    std::get<4>(fnDataAr.at(i)) = userData;
+                    threadPool.queueFn([&fn] (void *ud) {
+                        auto *data = static_cast<TPFnDataType*>(ud);
+                        bool isValid;
+                        for(std::size_t i = std::get<3>(*data).at(0);
+                                i < std::get<3>(*data).at(1);
+                                ++i) {
+                            if(!std::get<0>(*data)->isAlive(i)) {
+                                continue;
+                            }
+                            isValid = true;
+                            for(const auto& integralValue : *std::get<2>(*data)) {
+                                if(!std::get<BitsetType>(std::get<1>(*data)->at(i)).getCombinedBit(integralValue)) {
+                                    isValid = false;
+                                    break;
                                 }
+                            }
+                            if(!isValid) { continue; }
 
-                                isValid = true;
-                                for(const auto& integralValue : iterable) {
-                                    if(!std::get<BitsetType>(entities[i]).getCombinedBit(integralValue)) {
-                                        isValid = false;
-                                        break;
-                                    }
-                                }
-                                if(!isValid) { continue; }
+                            fn(i, std::get<0>(*data), std::get<4>(*data));
 
-                                fn(i, this, userPtr);
-                            }
-                        },
-                    begin, end);
+                        }
+                    }, &fnDataAr.at(i));
                 }
-                for(std::size_t i = 0; i < threadCount; ++i) {
-                    threads[i].join();
+                threadPool.wakeThreads();
+                while(!threadPool.isAllThreadsWaiting()) {
+                    std::this_thread::sleep_for(std::chrono::milliseconds(5));
                 }
             }
         }
index 24b8b116f2986b3d933eba34e29479bd73ea8650..31059038759b57205feb94eefe9678a605f9eb18 100644 (file)
@@ -464,7 +464,7 @@ TEST(EC, MultiThreaded)
             c->y = 2;
         },
         nullptr,
-        2
+        true
     );
 
     for(unsigned int i = 0; i < 17; ++i)
@@ -490,7 +490,7 @@ TEST(EC, MultiThreaded)
             c->y = 4;
         },
         nullptr,
-        8
+        true
     );
 
     for(unsigned int i = 0; i < 3; ++i)
@@ -516,7 +516,7 @@ TEST(EC, MultiThreaded)
         }
     );
 
-    manager.callForMatchingFunctions(2);
+    manager.callForMatchingFunctions(true);
 
     for(unsigned int i = 0; i < 17; ++i)
     {
@@ -531,7 +531,7 @@ TEST(EC, MultiThreaded)
         }
     );
 
-    manager.callForMatchingFunction(f1, 4);
+    manager.callForMatchingFunction(f1, true);
 
     for(unsigned int i = 0; i < 17; ++i)
     {
@@ -544,7 +544,7 @@ TEST(EC, MultiThreaded)
         manager.deleteEntity(i);
     }
 
-    manager.callForMatchingFunction(f0, 8);
+    manager.callForMatchingFunction(f0, true);
 
     for(unsigned int i = 0; i < 4; ++i)
     {
@@ -710,7 +710,7 @@ TEST(EC, ForMatchingSignatures)
             c0->y = 2;
         }),
         nullptr,
-        3
+        true
     );
     
     for(auto iter = cx.begin(); iter != cx.end(); ++iter)
@@ -850,7 +850,7 @@ TEST(EC, forMatchingPtrs)
         &func0
     );
     manager.forMatchingSignaturePtr<TypeList<C0, T0> >(
-        &func1
+        &func1, nullptr, true
     );
 
     for(auto eid : e)
@@ -1098,7 +1098,7 @@ TEST(EC, forMatchingSimple) {
             C0 *c0 = manager->getEntityData<C0>(id);
             c0->x += 10;
             c0->y += 10;
-        }, nullptr, 3);
+        }, nullptr, true);
 
     // verify
     {
@@ -1296,7 +1296,7 @@ TEST(EC, forMatchingIterableFn)
             c->x += 100;
             c->y += 100;
         };
-        manager.forMatchingIterable(iterable, fn, nullptr, 3);
+        manager.forMatchingIterable(iterable, fn, nullptr, true);
     }
 
     {
@@ -1322,7 +1322,7 @@ TEST(EC, forMatchingIterableFn)
             c->x += 1000;
             c->y += 1000;
         };
-        manager.forMatchingIterable(iterable, fn, nullptr, 3);
+        manager.forMatchingIterable(iterable, fn, nullptr, true);
     }
 
     {
@@ -1365,7 +1365,7 @@ TEST(EC, MultiThreadedForMatching) {
     EXPECT_TRUE(manager.isAlive(first));
     EXPECT_TRUE(manager.isAlive(second));
 
-    manager.callForMatchingFunction(fnIdx, 2);
+    manager.callForMatchingFunction(fnIdx, true);
 
     EXPECT_TRUE(manager.isAlive(first));
     EXPECT_FALSE(manager.isAlive(second));