]> git.seodisparate.com - EntityComponentMetaSystem/commitdiff
Impl nested threaded calls
authorStephen Seo <seo.disparate@gmail.com>
Wed, 15 Jun 2022 12:15:34 +0000 (21:15 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Wed, 15 Jun 2022 12:15:34 +0000 (21:15 +0900)
More testing is probably required to make sure it works properly.

.clang-format [new file with mode: 0644]
src/EC/Manager.hpp
src/EC/ThreadPool.hpp
src/test/ECTest.cpp
src/test/ThreadPoolTest.cpp

diff --git a/.clang-format b/.clang-format
new file mode 100644 (file)
index 0000000..5a992df
--- /dev/null
@@ -0,0 +1,212 @@
+---
+Language:        Cpp
+# BasedOnStyle:  Google
+AccessModifierOffset: -1
+AlignAfterOpenBracket: Align
+AlignArrayOfStructures: None
+AlignConsecutiveMacros: None
+AlignConsecutiveAssignments: None
+AlignConsecutiveBitFields: None
+AlignConsecutiveDeclarations: None
+AlignEscapedNewlines: Left
+AlignOperands:   Align
+AlignTrailingComments: true
+AllowAllArgumentsOnNextLine: true
+AllowAllConstructorInitializersOnNextLine: true
+AllowAllParametersOfDeclarationOnNextLine: true
+AllowShortEnumsOnASingleLine: true
+AllowShortBlocksOnASingleLine: Never
+AllowShortCaseLabelsOnASingleLine: false
+AllowShortFunctionsOnASingleLine: All
+AllowShortLambdasOnASingleLine: All
+AllowShortIfStatementsOnASingleLine: WithoutElse
+AllowShortLoopsOnASingleLine: true
+AlwaysBreakAfterDefinitionReturnType: None
+AlwaysBreakAfterReturnType: None
+AlwaysBreakBeforeMultilineStrings: true
+AlwaysBreakTemplateDeclarations: Yes
+AttributeMacros:
+  - __capability
+BinPackArguments: true
+BinPackParameters: true
+BraceWrapping:
+  AfterCaseLabel:  false
+  AfterClass:      false
+  AfterControlStatement: Never
+  AfterEnum:       false
+  AfterFunction:   false
+  AfterNamespace:  false
+  AfterObjCDeclaration: false
+  AfterStruct:     false
+  AfterUnion:      false
+  AfterExternBlock: false
+  BeforeCatch:     false
+  BeforeElse:      false
+  BeforeLambdaBody: false
+  BeforeWhile:     false
+  IndentBraces:    false
+  SplitEmptyFunction: true
+  SplitEmptyRecord: true
+  SplitEmptyNamespace: true
+BreakBeforeBinaryOperators: None
+BreakBeforeConceptDeclarations: true
+BreakBeforeBraces: Attach
+BreakBeforeInheritanceComma: false
+BreakInheritanceList: BeforeColon
+BreakBeforeTernaryOperators: true
+BreakConstructorInitializersBeforeComma: false
+BreakConstructorInitializers: BeforeColon
+BreakAfterJavaFieldAnnotations: false
+BreakStringLiterals: true
+ColumnLimit:     80
+CommentPragmas:  '^ IWYU pragma:'
+CompactNamespaces: false
+ConstructorInitializerAllOnOneLineOrOnePerLine: true
+ConstructorInitializerIndentWidth: 4
+ContinuationIndentWidth: 4
+Cpp11BracedListStyle: true
+DeriveLineEnding: true
+DerivePointerAlignment: true
+DisableFormat:   false
+EmptyLineAfterAccessModifier: Never
+EmptyLineBeforeAccessModifier: LogicalBlock
+ExperimentalAutoDetectBinPacking: false
+FixNamespaceComments: true
+ForEachMacros:
+  - foreach
+  - Q_FOREACH
+  - BOOST_FOREACH
+IfMacros:
+  - KJ_IF_MAYBE
+IncludeBlocks:   Regroup
+IncludeCategories:
+  - Regex:           '^<ext/.*\.h>'
+    Priority:        2
+    SortPriority:    0
+    CaseSensitive:   false
+  - Regex:           '^<.*\.h>'
+    Priority:        1
+    SortPriority:    0
+    CaseSensitive:   false
+  - Regex:           '^<.*'
+    Priority:        2
+    SortPriority:    0
+    CaseSensitive:   false
+  - Regex:           '.*'
+    Priority:        3
+    SortPriority:    0
+    CaseSensitive:   false
+IncludeIsMainRegex: '([-_](test|unittest))?$'
+IncludeIsMainSourceRegex: ''
+IndentAccessModifiers: false
+IndentCaseLabels: true
+IndentCaseBlocks: false
+IndentGotoLabels: true
+IndentPPDirectives: None
+IndentExternBlock: AfterExternBlock
+IndentRequires:  false
+IndentWidth:     4
+IndentWrappedFunctionNames: false
+InsertTrailingCommas: None
+JavaScriptQuotes: Leave
+JavaScriptWrapImports: true
+KeepEmptyLinesAtTheStartOfBlocks: false
+LambdaBodyIndentation: Signature
+MacroBlockBegin: ''
+MacroBlockEnd:   ''
+MaxEmptyLinesToKeep: 1
+NamespaceIndentation: None
+ObjCBinPackProtocolList: Never
+ObjCBlockIndentWidth: 4
+ObjCBreakBeforeNestedBlockParam: true
+ObjCSpaceAfterProperty: false
+ObjCSpaceBeforeProtocolList: true
+PenaltyBreakAssignment: 2
+PenaltyBreakBeforeFirstCallParameter: 1
+PenaltyBreakComment: 300
+PenaltyBreakFirstLessLess: 120
+PenaltyBreakString: 1000
+PenaltyBreakTemplateDeclaration: 10
+PenaltyExcessCharacter: 1000000
+PenaltyReturnTypeOnItsOwnLine: 200
+PenaltyIndentedWhitespace: 0
+PointerAlignment: Left
+PPIndentWidth:   -1
+RawStringFormats:
+  - Language:        Cpp
+    Delimiters:
+      - cc
+      - CC
+      - cpp
+      - Cpp
+      - CPP
+      - 'c++'
+      - 'C++'
+    CanonicalDelimiter: ''
+    BasedOnStyle:    google
+  - Language:        TextProto
+    Delimiters:
+      - pb
+      - PB
+      - proto
+      - PROTO
+    EnclosingFunctions:
+      - EqualsProto
+      - EquivToProto
+      - PARSE_PARTIAL_TEXT_PROTO
+      - PARSE_TEST_PROTO
+      - PARSE_TEXT_PROTO
+      - ParseTextOrDie
+      - ParseTextProtoOrDie
+      - ParseTestProto
+      - ParsePartialTestProto
+    CanonicalDelimiter: pb
+    BasedOnStyle:    google
+ReferenceAlignment: Pointer
+ReflowComments:  true
+ShortNamespaceLines: 1
+SortIncludes:    CaseSensitive
+SortJavaStaticImport: Before
+SortUsingDeclarations: true
+SpaceAfterCStyleCast: false
+SpaceAfterLogicalNot: false
+SpaceAfterTemplateKeyword: true
+SpaceBeforeAssignmentOperators: true
+SpaceBeforeCaseColon: false
+SpaceBeforeCpp11BracedList: false
+SpaceBeforeCtorInitializerColon: true
+SpaceBeforeInheritanceColon: true
+SpaceBeforeParens: ControlStatements
+SpaceAroundPointerQualifiers: Default
+SpaceBeforeRangeBasedForLoopColon: true
+SpaceInEmptyBlock: false
+SpaceInEmptyParentheses: false
+SpacesBeforeTrailingComments: 2
+SpacesInAngles:  Never
+SpacesInConditionalStatement: false
+SpacesInContainerLiterals: true
+SpacesInCStyleCastParentheses: false
+SpacesInLineCommentPrefix:
+  Minimum:         1
+  Maximum:         -1
+SpacesInParentheses: false
+SpacesInSquareBrackets: false
+SpaceBeforeSquareBrackets: false
+BitFieldColonSpacing: Both
+Standard:        Auto
+StatementAttributeLikeMacros:
+  - Q_EMIT
+StatementMacros:
+  - Q_UNUSED
+  - QT_REQUIRE_VERSION
+TabWidth:        4
+UseCRLF:         false
+UseTab:          Never
+WhitespaceSensitiveMacros:
+  - STRINGIZE
+  - PP_STRINGIZE
+  - BOOST_PP_STRINGIZE
+  - NS_SWIFT_NAME
+  - CF_SWIFT_NAME
+...
+
index d82031d8e85e37ddc322946aecbe1e951519c668..ad8d382b077f221561fa7bb106f762e69b886043 100644 (file)
@@ -7,6 +7,7 @@
 #ifndef EC_MANAGER_HPP
 #define EC_MANAGER_HPP
 
+#include <chrono>
 #define EC_INIT_ENTITIES_SIZE 256
 #define EC_GROW_SIZE_AMOUNT 256
 
@@ -107,6 +108,10 @@ namespace EC
         std::vector<std::size_t> deferredDeletions;
         std::mutex deferredDeletionsMutex;
 
+        std::vector<std::size_t> idStack;
+        std::size_t idStackCounter;
+        std::mutex idStackMutex;
+
     public:
         // section for "temporary" structures {{{
         /// Temporary struct used internally by ThreadPool
@@ -196,7 +201,9 @@ namespace EC
             The default capacity is set with macro EC_INIT_ENTITIES_SIZE,
             and will grow by amounts of EC_GROW_SIZE_AMOUNT when needed.
         */
-        Manager()
+        Manager() :
+            threadPool{},
+            idStackCounter(0)
         {
             resize(EC_INIT_ENTITIES_SIZE);
             if(ThreadCount >= 2) {
@@ -206,6 +213,14 @@ namespace EC
             deferringDeletions.store(0);
         }
 
+        ~Manager() {
+            if (threadPool) {
+                while(!threadPool->isNotRunning()) {
+                    std::this_thread::sleep_for(std::chrono::microseconds(30));
+                }
+            }
+        }
+
     private:
         void resize(std::size_t newCapacity)
         {
@@ -750,6 +765,13 @@ namespace EC
             void* userData = nullptr,
             const bool useThreadPool = false)
         {
+            std::size_t current_id;
+            {
+                // push to idStack "call stack"
+                std::lock_guard<std::mutex> lock(idStackMutex);
+                current_id = idStackCounter++;
+                idStack.push_back(current_id);
+            }
             deferringDeletions.fetch_add(1);
             using SignatureComponents =
                 typename EC::Meta::Matching<Signature, ComponentsList>::type;
@@ -826,9 +848,21 @@ namespace EC
                         delete data;
                     }, fnDataAr[i]);
                 }
-                threadPool->easyWakeAndWait();
+                threadPool->easyStartAndWait();
             }
 
+            // pop from idStack "call stack"
+            do {
+                {
+                    std::lock_guard<std::mutex> lock(idStackMutex);
+                    if (idStack.back() == current_id) {
+                        idStack.pop_back();
+                        break;
+                    }
+                }
+                std::this_thread::sleep_for(std::chrono::microseconds(15));
+            } while (true);
+
             handleDeferredDeletions();
         }
 
@@ -878,6 +912,13 @@ namespace EC
             void* userData = nullptr,
             const bool useThreadPool = false)
         {
+            std::size_t current_id;
+            {
+                // push to idStack "call stack"
+                std::lock_guard<std::mutex> lock(idStackMutex);
+                current_id = idStackCounter++;
+                idStack.push_back(current_id);
+            }
             deferringDeletions.fetch_add(1);
             using SignatureComponents =
                 typename EC::Meta::Matching<Signature, ComponentsList>::type;
@@ -951,9 +992,21 @@ namespace EC
                         }
                     }, &fnDataAr[i]);
                 }
-                threadPool->easyWakeAndWait();
+                threadPool->easyStartAndWait();
             }
 
+            // pop from idStack "call stack"
+            do {
+                {
+                    std::lock_guard<std::mutex> lock(idStackMutex);
+                    if (idStack.back() == current_id) {
+                        idStack.pop_back();
+                        break;
+                    }
+                }
+                std::this_thread::sleep_for(std::chrono::microseconds(15));
+            } while (true);
+
             handleDeferredDeletions();
         }
 
@@ -1099,7 +1152,7 @@ namespace EC
                                 }
                             }, &fnDataAr[i]);
                         }
-                        threadPool->easyWakeAndWait();
+                        threadPool->easyStartAndWait();
                     }
                 })));
 
@@ -1180,7 +1233,7 @@ namespace EC
                         }
                     }, &fnDataAr[i]);
                 }
-                threadPool->easyWakeAndWait();
+                threadPool->easyStartAndWait();
             }
 
             return matchingV;
@@ -1483,6 +1536,13 @@ namespace EC
             void* userData = nullptr,
             const bool useThreadPool = false)
         {
+            std::size_t current_id;
+            {
+                // push to idStack "call stack"
+                std::lock_guard<std::mutex> lock(idStackMutex);
+                current_id = idStackCounter++;
+                idStack.push_back(current_id);
+            }
             deferringDeletions.fetch_add(1);
             std::vector<std::vector<std::size_t> >
                 multiMatchingEntities(SigList::size);
@@ -1565,7 +1625,7 @@ namespace EC
                         }
                     }, &fnDataAr[i]);
                 }
-                threadPool->easyWakeAndWait();
+                threadPool->easyStartAndWait();
             }
 
             // call functions on matching entities
@@ -1630,11 +1690,23 @@ namespace EC
                                 }
                             }, &fnDataAr[i]);
                         }
-                        threadPool->easyWakeAndWait();
+                        threadPool->easyStartAndWait();
                     }
                 }
             );
 
+            // pop from idStack "call stack"
+            do {
+                {
+                    std::lock_guard<std::mutex> lock(idStackMutex);
+                    if (idStack.back() == current_id) {
+                        idStack.pop_back();
+                        break;
+                    }
+                }
+                std::this_thread::sleep_for(std::chrono::microseconds(15));
+            } while (true);
+
             handleDeferredDeletions();
         }
 
@@ -1694,6 +1766,13 @@ namespace EC
             void* userData = nullptr,
             const bool useThreadPool = false)
         {
+            std::size_t current_id;
+            {
+                // push to idStack "call stack"
+                std::lock_guard<std::mutex> lock(idStackMutex);
+                current_id = idStackCounter++;
+                idStack.push_back(current_id);
+            }
             deferringDeletions.fetch_add(1);
             std::vector<std::vector<std::size_t> > multiMatchingEntities(
                 SigList::size);
@@ -1776,7 +1855,7 @@ namespace EC
                         }
                     }, &fnDataAr[i]);
                 }
-                threadPool->easyWakeAndWait();
+                threadPool->easyStartAndWait();
             }
 
             // call functions on matching entities
@@ -1846,11 +1925,23 @@ namespace EC
                                 }
                             }, &fnDataAr[i]);
                         }
-                        threadPool->easyWakeAndWait();
+                        threadPool->easyStartAndWait();
                     }
                 }
             );
 
+            // pop from idStack "call stack"
+            do {
+                {
+                    std::lock_guard<std::mutex> lock(idStackMutex);
+                    if (idStack.back() == current_id) {
+                        idStack.pop_back();
+                        break;
+                    }
+                }
+                std::this_thread::sleep_for(std::chrono::microseconds(15));
+            } while (true);
+
             handleDeferredDeletions();
         }
 
@@ -1879,6 +1970,13 @@ namespace EC
         void forMatchingSimple(ForMatchingFn fn,
                                void *userData = nullptr,
                                const bool useThreadPool = false) {
+            std::size_t current_id;
+            {
+                // push to idStack "call stack"
+                std::lock_guard<std::mutex> lock(idStackMutex);
+                current_id = idStackCounter++;
+                idStack.push_back(current_id);
+            }
             deferringDeletions.fetch_add(1);
             const BitsetType signatureBitset =
                 BitsetType::template generateBitset<Signature>();
@@ -1934,9 +2032,21 @@ namespace EC
                         delete data;
                     }, fnDataAr[i]);
                 }
-                threadPool->easyWakeAndWait();
+                threadPool->easyStartAndWait();
             }
 
+            // pop from idStack "call stack"
+            do {
+                {
+                    std::lock_guard<std::mutex> lock(idStackMutex);
+                    if (idStack.back() == current_id) {
+                        idStack.pop_back();
+                        break;
+                    }
+                }
+                std::this_thread::sleep_for(std::chrono::microseconds(15));
+            } while (true);
+
             handleDeferredDeletions();
         }
 
@@ -1959,10 +2069,18 @@ namespace EC
             may not have as great of a speed-up.
          */
         template <typename Iterable>
-        void forMatchingIterable(Iterable iterable, 
+        void forMatchingIterable(Iterable iterable,
                                  ForMatchingFn fn,
                                  void* userData = nullptr,
                                  const bool useThreadPool = false) {
+            std::size_t current_id;
+            {
+                // push to idStack "call stack"
+                std::lock_guard<std::mutex> lock(idStackMutex);
+                current_id = idStackCounter++;
+                idStack.push_back(current_id);
+            }
+
             deferringDeletions.fetch_add(1);
             if(!useThreadPool || !threadPool) {
                 bool isValid;
@@ -2031,9 +2149,21 @@ namespace EC
                         }
                     }, &fnDataAr[i]);
                 }
-                threadPool->easyWakeAndWait();
+                threadPool->easyStartAndWait();
             }
 
+            // pop from idStack "call stack"
+            do {
+                {
+                    std::lock_guard<std::mutex> lock(idStackMutex);
+                    if (idStack.back() == current_id) {
+                        idStack.pop_back();
+                        break;
+                    }
+                }
+                std::this_thread::sleep_for(std::chrono::microseconds(15));
+            } while (true);
+
             handleDeferredDeletions();
         }
     };
index 3611954d4291088718ad487cec5e54df5b89fa90..74f16da7215d1b56075d0e60b8a3bf780e123b36 100644 (file)
@@ -1,99 +1,52 @@
 #ifndef EC_META_SYSTEM_THREADPOOL_HPP
 #define EC_META_SYSTEM_THREADPOOL_HPP
 
-#include <type_traits>
-#include <vector>
-#include <thread>
 #include <atomic>
+#include <chrono>
+#include <deque>
+#include <functional>
+#include <list>
+#include <memory>
 #include <mutex>
-#include <condition_variable>
 #include <queue>
-#include <functional>
+#include <thread>
 #include <tuple>
-#include <chrono>
-#include <unordered_set>
+#include <vector>
 
 #ifndef NDEBUG
-# include <iostream>
+#include <iostream>
 #endif
 
 namespace EC {
 
 namespace Internal {
-    using TPFnType = std::function<void(void*)>;
-    using TPTupleType = std::tuple<TPFnType, void*>;
-    using TPQueueType = std::queue<TPTupleType>;
-
-    template <unsigned int SIZE>
-    void thread_fn(std::atomic_bool *isAlive,
-                   std::condition_variable *cv,
-                   std::mutex *cvMutex,
-                   Internal::TPQueueType *fnQueue,
-                   std::mutex *queueMutex,
-                   std::atomic_int *waitCount) {
-        bool hasFn = false;
-        Internal::TPTupleType fnTuple;
-        while(isAlive->load()) {
-            hasFn = false;
-            {
-                std::lock_guard<std::mutex> lock(*queueMutex);
-                if(!fnQueue->empty()) {
-                    fnTuple = fnQueue->front();
-                    fnQueue->pop();
-                    hasFn = true;
-                }
-            }
-            if(hasFn) {
-                std::get<0>(fnTuple)(std::get<1>(fnTuple));
-                continue;
-            }
-
-            waitCount->fetch_add(1);
-            {
-                std::unique_lock<std::mutex> lock(*cvMutex);
-                cv->wait(lock);
-            }
-            waitCount->fetch_sub(1);
-        }
-    }
-} // namespace Internal
+using TPFnType = std::function<void(void *)>;
+using TPTupleType = std::tuple<TPFnType, void *>;
+using TPQueueType = std::queue<TPTupleType>;
+using ThreadPtr = std::unique_ptr<std::thread>;
+using ThreadStackType = std::vector<std::tuple<ThreadPtr, std::thread::id>>;
+using ThreadStacksType = std::deque<ThreadStackType>;
+using ThreadStacksMutexesT = std::deque<std::mutex>;
+using ThreadCountersT = std::deque<std::atomic_uint>;
+using PointersT =
+    std::tuple<ThreadStackType *, std::mutex *, std::atomic_uint *>;
+}  // namespace Internal
 
 /*!
     \brief Implementation of a Thread Pool.
 
-    Note that if SIZE is less than 2, then ThreadPool will not create threads and
-    run queued functions on the calling thread.
+    Note that if SIZE is less than 2, then ThreadPool will not create threads
+    and run queued functions on the calling thread.
 */
-template <unsigned int SIZE>
+template <unsigned int MAXSIZE>
 class ThreadPool {
-public:
-    ThreadPool() {
-        waitCount.store(0);
-        extraThreadCount.store(0);
-        isAlive.store(true);
-        if(SIZE >= 2) {
-            for(unsigned int i = 0; i < SIZE; ++i) {
-                threads.emplace_back(Internal::thread_fn<SIZE>,
-                                     &isAlive,
-                                     &cv,
-                                     &cvMutex,
-                                     &fnQueue,
-                                     &queueMutex,
-                                     &waitCount);
-                threadsIDs.insert(threads.back().get_id());
-            }
-        }
-    }
+   public:
+    ThreadPool()
+        : threadStacks{}, threadStackMutexes{}, fnQueue{}, queueMutex{} {}
 
     ~ThreadPool() {
-        if(SIZE >= 2) {
-            isAlive.store(false);
-            std::this_thread::sleep_for(std::chrono::milliseconds(20));
-            cv.notify_all();
-            for(auto &thread : threads) {
-                thread.join();
-            }
-            std::this_thread::sleep_for(std::chrono::milliseconds(20));
+        while (!isNotRunning()) {
+            std::this_thread::sleep_for(std::chrono::microseconds(30));
         }
     }
 
@@ -104,87 +57,97 @@ public:
         waiting threads which will start pulling functions from the queue to be
         called.
     */
-    void queueFn(std::function<void(void*)>&& fn, void *ud = nullptr) {
+    void queueFn(std::function<void(void *)> &&fn, void *ud = nullptr) {
         std::lock_guard<std::mutex> lock(queueMutex);
         fnQueue.emplace(std::make_tuple(fn, ud));
     }
 
-    /*!
-        \brief Wakes waiting threads to start running queued functions.
+    void startThreads() {
+        if (MAXSIZE >= 2) {
+            checkStacks();
+            auto pointers = newStackEntry();
+            Internal::ThreadStackType *threadStack = std::get<0>(pointers);
+            std::mutex *threadStackMutex = std::get<1>(pointers);
+            std::atomic_uint *aCounter = std::get<2>(pointers);
+            for (unsigned int i = 0; i < MAXSIZE; ++i) {
+                std::thread *newThread = new std::thread(
+                    [](Internal::ThreadStackType *threadStack,
+                       std::mutex *threadStackMutex,
+                       Internal::TPQueueType *fnQueue, std::mutex *queueMutex,
+                       std::atomic_uint *initCount) {
+                        // add id to idStack "call stack"
+                        {
+                            std::lock_guard<std::mutex> lock(*threadStackMutex);
+                            threadStack->push_back(
+                                {Internal::ThreadPtr(nullptr),
+                                 std::this_thread::get_id()});
+                        }
 
-        If SIZE is less than 2, then this function call will block until all the
-        queued functions have been executed on the calling thread.
+                        ++(*initCount);
 
-        If SIZE is 2 or greater, then this function will return immediately after
-        waking one or all threads, depending on the given boolean parameter.
-    */
-    void wakeThreads(const bool wakeAll = true) {
-        if(SIZE >= 2) {
-            // wake threads to pull functions from queue and run them
-            if(wakeAll) {
-                cv.notify_all();
-            } else {
-                cv.notify_one();
-            }
+                        // fetch queued fns and execute them
+                        // fnTuples must live until end of function
+                        std::list<Internal::TPTupleType> fnTuples;
+                        do {
+                            bool fnFound = false;
+                            {
+                                std::lock_guard<std::mutex> lock(*queueMutex);
+                                if (!fnQueue->empty()) {
+                                    fnTuples.emplace_back(
+                                        std::move(fnQueue->front()));
+                                    fnQueue->pop();
+                                    fnFound = true;
+                                }
+                            }
+                            if (fnFound) {
+                                std::get<0>(fnTuples.back())(
+                                    std::get<1>(fnTuples.back()));
+                            } else {
+                                break;
+                            }
+                        } while (true);
 
-            // check if all threads are running a task, and spawn a new thread
-            // if this is the case
-            Internal::TPTupleType fnTuple;
-            bool hasFn = false;
-            if (waitCount.load(std::memory_order_relaxed) == 0) {
-                std::lock_guard<std::mutex> queueLock(queueMutex);
-                if (!fnQueue.empty()) {
-                    fnTuple = fnQueue.front();
-                    fnQueue.pop();
-                    hasFn = true;
+                        // pop id from idStack "call stack"
+                        do {
+                            std::this_thread::sleep_for(
+                                std::chrono::microseconds(15));
+                            if (initCount->load() != MAXSIZE) {
+                                continue;
+                            }
+                            {
+                                std::lock_guard<std::mutex> lock(
+                                    *threadStackMutex);
+                                if (std::get<1>(threadStack->back()) ==
+                                    std::this_thread::get_id()) {
+                                    if (!std::get<0>(threadStack->back())) {
+                                        continue;
+                                    }
+                                    std::get<0>(threadStack->back())->detach();
+                                    threadStack->pop_back();
+                                    if (threadStack->empty()) {
+                                        initCount->store(0);
+                                    }
+                                    break;
+                                }
+                            }
+                        } while (true);
+                    },
+                    threadStack, threadStackMutex, &fnQueue, &queueMutex,
+                    aCounter);
+                while (aCounter->load() != i + 1) {
+                    std::this_thread::sleep_for(std::chrono::microseconds(15));
                 }
+                std::lock_guard<std::mutex> stackLock(*threadStackMutex);
+                std::get<0>(threadStack->at(i)).reset(newThread);
             }
-
-            if (hasFn) {
-#ifndef NDEBUG
-                std::cout << "Spawning extra thread...\n";
-#endif
-                extraThreadCount.fetch_add(1);
-                std::thread newThread = std::thread(
-                        [] (Internal::TPTupleType &&tuple, std::atomic_int *count) {
-                            std::get<0>(tuple)(std::get<1>(tuple));
-#ifndef NDEBUG
-                            std::cout << "Stopping extra thread...\n";
-#endif
-                            count->fetch_sub(1);
-                        },
-                        std::move(fnTuple), &extraThreadCount);
-                newThread.detach();
+            while (aCounter->load() != MAXSIZE) {
+                std::this_thread::sleep_for(std::chrono::microseconds(15));
             }
         } else {
             sequentiallyRunTasks();
         }
     }
 
-    /*!
-        \brief Gets the number of waiting threads.
-
-        If all threads are waiting, this should equal ThreadCount.
-
-        If SIZE is less than 2, then this will always return 0.
-    */
-    int getWaitCount() {
-        return waitCount.load(std::memory_order_relaxed);
-    }
-
-    /*!
-        \brief Returns true if all threads are waiting.
-
-        If SIZE is less than 2, then this will always return true.
-    */
-    bool isAllThreadsWaiting() {
-        if(SIZE >= 2) {
-            return waitCount.load(std::memory_order_relaxed) == SIZE;
-        } else {
-            return true;
-        }
-    }
-
     /*!
         \brief Returns true if the function queue is empty.
     */
@@ -196,43 +159,54 @@ public:
     /*!
         \brief Returns the ThreadCount that this class was created with.
      */
-    constexpr unsigned int getThreadCount() {
-        return SIZE;
-    }
+    constexpr unsigned int getMaxThreadCount() { return MAXSIZE; }
 
-    /*!
-        \brief Wakes all threads and blocks until all queued tasks are finished.
+    void easyStartAndWait() {
+        if (MAXSIZE >= 2) {
+            startThreads();
+            do {
+                std::this_thread::sleep_for(std::chrono::microseconds(30));
 
-        If SIZE is less than 2, then this function call will block until all the
-        queued functions have been executed on the calling thread.
+                bool isQueueEmpty = false;
+                {
+                    std::lock_guard<std::mutex> lock(queueMutex);
+                    isQueueEmpty = fnQueue.empty();
+                }
 
-        If SIZE is 2 or greater, then this function will block until all the
-        queued functions have been executed by the threads in the thread pool.
-     */
-    void easyWakeAndWait() {
-        if(SIZE >= 2) {
-            do {
-                wakeThreads();
-                std::this_thread::sleep_for(std::chrono::microseconds(150));
-            } while(!isQueueEmpty()
-                    || (threadsIDs.find(std::this_thread::get_id()) != threadsIDs.end()
-                         && extraThreadCount.load(std::memory_order_relaxed) != 0));
-//            } while(!isQueueEmpty() || !isAllThreadsWaiting());
+                if (isQueueEmpty) {
+                    break;
+                }
+            } while (true);
         } else {
             sequentiallyRunTasks();
         }
     }
 
-private:
-    std::vector<std::thread> threads;
-    std::unordered_set<std::thread::id> threadsIDs;
-    std::atomic_bool isAlive;
-    std::condition_variable cv;
-    std::mutex cvMutex;
+    bool isNotRunning() {
+        std::lock_guard<std::mutex> lock(dequesMutex);
+        auto tIter = threadStacks.begin();
+        auto mIter = threadStackMutexes.begin();
+        while (tIter != threadStacks.end() &&
+               mIter != threadStackMutexes.end()) {
+            {
+                std::lock_guard<std::mutex> lock(*mIter);
+                if (!tIter->empty()) {
+                    return false;
+                }
+            }
+            ++tIter;
+            ++mIter;
+        }
+        return true;
+    }
+
+   private:
+    Internal::ThreadStacksType threadStacks;
+    Internal::ThreadStacksMutexesT threadStackMutexes;
     Internal::TPQueueType fnQueue;
     std::mutex queueMutex;
-    std::atomic_int waitCount;
-    std::atomic_int extraThreadCount;
+    Internal::ThreadCountersT threadCounters;
+    std::mutex dequesMutex;
 
     void sequentiallyRunTasks() {
         // pull functions from queue and run them on current thread
@@ -241,7 +215,7 @@ private:
         do {
             {
                 std::lock_guard<std::mutex> lock(queueMutex);
-                if(!fnQueue.empty()) {
+                if (!fnQueue.empty()) {
                     hasFn = true;
                     fnTuple = fnQueue.front();
                     fnQueue.pop();
@@ -249,14 +223,50 @@ private:
                     hasFn = false;
                 }
             }
-            if(hasFn) {
+            if (hasFn) {
                 std::get<0>(fnTuple)(std::get<1>(fnTuple));
             }
-        } while(hasFn);
+        } while (hasFn);
     }
 
+    void checkStacks() {
+        std::lock_guard<std::mutex> lock(dequesMutex);
+        if (threadStacks.empty()) {
+            return;
+        }
+
+        bool erased = false;
+        do {
+            erased = false;
+            {
+                std::lock_guard<std::mutex> lock(threadStackMutexes.front());
+                if (threadStacks.front().empty()) {
+                    threadStacks.pop_front();
+                    threadCounters.pop_front();
+                    erased = true;
+                }
+            }
+            if (erased) {
+                threadStackMutexes.pop_front();
+            } else {
+                break;
+            }
+        } while (!threadStacks.empty() && !threadStackMutexes.empty() &&
+                 !threadCounters.empty());
+    }
+
+    Internal::PointersT newStackEntry() {
+        std::lock_guard<std::mutex> lock(dequesMutex);
+        threadStacks.emplace_back();
+        threadStackMutexes.emplace_back();
+        threadCounters.emplace_back();
+        threadCounters.back().store(0);
+
+        return {&threadStacks.back(), &threadStackMutexes.back(),
+                &threadCounters.back()};
+    }
 };
 
-} // namespace EC
+}  // namespace EC
 
 #endif
index 7852154bfc0b7b0daba1b28b6da4a4c2312e1c4c..381f24f1769f7d9840ea8e770f7309176d311b35 100644 (file)
@@ -1458,8 +1458,8 @@ TEST(EC, NestedThreadPoolTasks) {
                 EXPECT_NE(outer_c->x, inner_c->x);
                 EXPECT_NE(outer_c->y, inner_c->y);
             }
-        }, c, false);
+        }, c, true);
     }, &manager, true);
 
-    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    //std::this_thread::sleep_for(std::chrono::milliseconds(100));
 }
index 9ebcbe18d21ebd5233f787a8095a0f44dbe4211a..ea2f161a1f18445fa54aa258ae4ed7f66baf4f89 100644 (file)
@@ -16,22 +16,22 @@ TEST(ECThreadPool, OneThread) {
 
     p.queueFn(fn, &data);
 
-    p.wakeThreads();
+    p.startThreads();
 
     do {
         std::this_thread::sleep_for(std::chrono::milliseconds(10));
-    } while(!p.isQueueEmpty() || !p.isAllThreadsWaiting());
+    } while(!p.isQueueEmpty() || !p.isNotRunning());
 
     ASSERT_EQ(data.load(), 1);
 
     for(unsigned int i = 0; i < 10; ++i) {
         p.queueFn(fn, &data);
     }
-    p.wakeThreads();
+    p.startThreads();
 
     do {
         std::this_thread::sleep_for(std::chrono::milliseconds(10));
-    } while(!p.isQueueEmpty() || !p.isAllThreadsWaiting());
+    } while(!p.isQueueEmpty() || !p.isNotRunning());
 
     ASSERT_EQ(data.load(), 11);
 }
@@ -47,22 +47,22 @@ TEST(ECThreadPool, Simple) {
 
     p.queueFn(fn, &data);
 
-    p.wakeThreads();
+    p.startThreads();
 
     do {
         std::this_thread::sleep_for(std::chrono::milliseconds(10));
-    } while(!p.isQueueEmpty() || !p.isAllThreadsWaiting());
+    } while(!p.isQueueEmpty() || !p.isNotRunning());
 
     ASSERT_EQ(data.load(), 1);
 
     for(unsigned int i = 0; i < 10; ++i) {
         p.queueFn(fn, &data);
     }
-    p.wakeThreads();
+    p.startThreads();
 
     do {
         std::this_thread::sleep_for(std::chrono::milliseconds(10));
-    } while(!p.isQueueEmpty() || !p.isAllThreadsWaiting());
+    } while(!p.isQueueEmpty() || !p.isNotRunning());
 
     ASSERT_EQ(data.load(), 11);
 }
@@ -70,15 +70,15 @@ TEST(ECThreadPool, Simple) {
 TEST(ECThreadPool, QueryCount) {
     {
         OneThreadPool oneP;
-        ASSERT_EQ(1, oneP.getThreadCount());
+        ASSERT_EQ(1, oneP.getMaxThreadCount());
     }
     {
         ThreeThreadPool threeP;
-        ASSERT_EQ(3, threeP.getThreadCount());
+        ASSERT_EQ(3, threeP.getMaxThreadCount());
     }
 }
 
-TEST(ECThreadPool, easyWakeAndWait) {
+TEST(ECThreadPool, easyStartAndWait) {
     std::atomic_int data;
     data.store(0);
     {
@@ -89,7 +89,7 @@ TEST(ECThreadPool, easyWakeAndWait) {
                 atomicInt->fetch_add(1);
             }, &data);
         }
-        oneP.easyWakeAndWait();
+        oneP.easyStartAndWait();
         EXPECT_EQ(20, data.load());
     }
     {
@@ -100,7 +100,7 @@ TEST(ECThreadPool, easyWakeAndWait) {
                 atomicInt->fetch_add(1);
             }, &data);
         }
-        threeP.easyWakeAndWait();
+        threeP.easyStartAndWait();
         EXPECT_EQ(40, data.load());
     }
 }