From 292bffb6365cd042f6df33f9bd3e72bc6d23e21e Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Wed, 15 Jun 2022 21:15:34 +0900 Subject: [PATCH] Impl nested threaded calls More testing is probably required to make sure it works properly. --- .clang-format | 212 +++++++++++++++++++++ src/EC/Manager.hpp | 154 ++++++++++++++-- src/EC/ThreadPool.hpp | 358 ++++++++++++++++++------------------ src/test/ECTest.cpp | 4 +- src/test/ThreadPoolTest.cpp | 26 +-- 5 files changed, 553 insertions(+), 201 deletions(-) create mode 100644 .clang-format diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..5a992df --- /dev/null +++ b/.clang-format @@ -0,0 +1,212 @@ +--- +Language: Cpp +# BasedOnStyle: Google +AccessModifierOffset: -1 +AlignAfterOpenBracket: Align +AlignArrayOfStructures: None +AlignConsecutiveMacros: None +AlignConsecutiveAssignments: None +AlignConsecutiveBitFields: None +AlignConsecutiveDeclarations: None +AlignEscapedNewlines: Left +AlignOperands: Align +AlignTrailingComments: true +AllowAllArgumentsOnNextLine: true +AllowAllConstructorInitializersOnNextLine: true +AllowAllParametersOfDeclarationOnNextLine: true +AllowShortEnumsOnASingleLine: true +AllowShortBlocksOnASingleLine: Never +AllowShortCaseLabelsOnASingleLine: false +AllowShortFunctionsOnASingleLine: All +AllowShortLambdasOnASingleLine: All +AllowShortIfStatementsOnASingleLine: WithoutElse +AllowShortLoopsOnASingleLine: true +AlwaysBreakAfterDefinitionReturnType: None +AlwaysBreakAfterReturnType: None +AlwaysBreakBeforeMultilineStrings: true +AlwaysBreakTemplateDeclarations: Yes +AttributeMacros: + - __capability +BinPackArguments: true +BinPackParameters: true +BraceWrapping: + AfterCaseLabel: false + AfterClass: false + AfterControlStatement: Never + AfterEnum: false + AfterFunction: false + AfterNamespace: false + AfterObjCDeclaration: false + AfterStruct: false + AfterUnion: false + AfterExternBlock: false + BeforeCatch: false + BeforeElse: false + BeforeLambdaBody: false + BeforeWhile: false + IndentBraces: false + SplitEmptyFunction: true + SplitEmptyRecord: true + SplitEmptyNamespace: true +BreakBeforeBinaryOperators: None +BreakBeforeConceptDeclarations: true +BreakBeforeBraces: Attach +BreakBeforeInheritanceComma: false +BreakInheritanceList: BeforeColon +BreakBeforeTernaryOperators: true +BreakConstructorInitializersBeforeComma: false +BreakConstructorInitializers: BeforeColon +BreakAfterJavaFieldAnnotations: false +BreakStringLiterals: true +ColumnLimit: 80 +CommentPragmas: '^ IWYU pragma:' +CompactNamespaces: false +ConstructorInitializerAllOnOneLineOrOnePerLine: true +ConstructorInitializerIndentWidth: 4 +ContinuationIndentWidth: 4 +Cpp11BracedListStyle: true +DeriveLineEnding: true +DerivePointerAlignment: true +DisableFormat: false +EmptyLineAfterAccessModifier: Never +EmptyLineBeforeAccessModifier: LogicalBlock +ExperimentalAutoDetectBinPacking: false +FixNamespaceComments: true +ForEachMacros: + - foreach + - Q_FOREACH + - BOOST_FOREACH +IfMacros: + - KJ_IF_MAYBE +IncludeBlocks: Regroup +IncludeCategories: + - Regex: '^' + Priority: 2 + SortPriority: 0 + CaseSensitive: false + - Regex: '^<.*\.h>' + Priority: 1 + SortPriority: 0 + CaseSensitive: false + - Regex: '^<.*' + Priority: 2 + SortPriority: 0 + CaseSensitive: false + - Regex: '.*' + Priority: 3 + SortPriority: 0 + CaseSensitive: false +IncludeIsMainRegex: '([-_](test|unittest))?$' +IncludeIsMainSourceRegex: '' +IndentAccessModifiers: false +IndentCaseLabels: true +IndentCaseBlocks: false +IndentGotoLabels: true +IndentPPDirectives: None +IndentExternBlock: AfterExternBlock +IndentRequires: false +IndentWidth: 4 +IndentWrappedFunctionNames: false +InsertTrailingCommas: None +JavaScriptQuotes: Leave +JavaScriptWrapImports: true +KeepEmptyLinesAtTheStartOfBlocks: false +LambdaBodyIndentation: Signature +MacroBlockBegin: '' +MacroBlockEnd: '' +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None +ObjCBinPackProtocolList: Never +ObjCBlockIndentWidth: 4 +ObjCBreakBeforeNestedBlockParam: true +ObjCSpaceAfterProperty: false +ObjCSpaceBeforeProtocolList: true +PenaltyBreakAssignment: 2 +PenaltyBreakBeforeFirstCallParameter: 1 +PenaltyBreakComment: 300 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakString: 1000 +PenaltyBreakTemplateDeclaration: 10 +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 200 +PenaltyIndentedWhitespace: 0 +PointerAlignment: Left +PPIndentWidth: -1 +RawStringFormats: + - Language: Cpp + Delimiters: + - cc + - CC + - cpp + - Cpp + - CPP + - 'c++' + - 'C++' + CanonicalDelimiter: '' + BasedOnStyle: google + - Language: TextProto + Delimiters: + - pb + - PB + - proto + - PROTO + EnclosingFunctions: + - EqualsProto + - EquivToProto + - PARSE_PARTIAL_TEXT_PROTO + - PARSE_TEST_PROTO + - PARSE_TEXT_PROTO + - ParseTextOrDie + - ParseTextProtoOrDie + - ParseTestProto + - ParsePartialTestProto + CanonicalDelimiter: pb + BasedOnStyle: google +ReferenceAlignment: Pointer +ReflowComments: true +ShortNamespaceLines: 1 +SortIncludes: CaseSensitive +SortJavaStaticImport: Before +SortUsingDeclarations: true +SpaceAfterCStyleCast: false +SpaceAfterLogicalNot: false +SpaceAfterTemplateKeyword: true +SpaceBeforeAssignmentOperators: true +SpaceBeforeCaseColon: false +SpaceBeforeCpp11BracedList: false +SpaceBeforeCtorInitializerColon: true +SpaceBeforeInheritanceColon: true +SpaceBeforeParens: ControlStatements +SpaceAroundPointerQualifiers: Default +SpaceBeforeRangeBasedForLoopColon: true +SpaceInEmptyBlock: false +SpaceInEmptyParentheses: false +SpacesBeforeTrailingComments: 2 +SpacesInAngles: Never +SpacesInConditionalStatement: false +SpacesInContainerLiterals: true +SpacesInCStyleCastParentheses: false +SpacesInLineCommentPrefix: + Minimum: 1 + Maximum: -1 +SpacesInParentheses: false +SpacesInSquareBrackets: false +SpaceBeforeSquareBrackets: false +BitFieldColonSpacing: Both +Standard: Auto +StatementAttributeLikeMacros: + - Q_EMIT +StatementMacros: + - Q_UNUSED + - QT_REQUIRE_VERSION +TabWidth: 4 +UseCRLF: false +UseTab: Never +WhitespaceSensitiveMacros: + - STRINGIZE + - PP_STRINGIZE + - BOOST_PP_STRINGIZE + - NS_SWIFT_NAME + - CF_SWIFT_NAME +... + diff --git a/src/EC/Manager.hpp b/src/EC/Manager.hpp index d82031d..ad8d382 100644 --- a/src/EC/Manager.hpp +++ b/src/EC/Manager.hpp @@ -7,6 +7,7 @@ #ifndef EC_MANAGER_HPP #define EC_MANAGER_HPP +#include #define EC_INIT_ENTITIES_SIZE 256 #define EC_GROW_SIZE_AMOUNT 256 @@ -107,6 +108,10 @@ namespace EC std::vector deferredDeletions; std::mutex deferredDeletionsMutex; + std::vector idStack; + std::size_t idStackCounter; + std::mutex idStackMutex; + public: // section for "temporary" structures {{{ /// Temporary struct used internally by ThreadPool @@ -196,7 +201,9 @@ namespace EC The default capacity is set with macro EC_INIT_ENTITIES_SIZE, and will grow by amounts of EC_GROW_SIZE_AMOUNT when needed. */ - Manager() + Manager() : + threadPool{}, + idStackCounter(0) { resize(EC_INIT_ENTITIES_SIZE); if(ThreadCount >= 2) { @@ -206,6 +213,14 @@ namespace EC deferringDeletions.store(0); } + ~Manager() { + if (threadPool) { + while(!threadPool->isNotRunning()) { + std::this_thread::sleep_for(std::chrono::microseconds(30)); + } + } + } + private: void resize(std::size_t newCapacity) { @@ -750,6 +765,13 @@ namespace EC void* userData = nullptr, const bool useThreadPool = false) { + std::size_t current_id; + { + // push to idStack "call stack" + std::lock_guard lock(idStackMutex); + current_id = idStackCounter++; + idStack.push_back(current_id); + } deferringDeletions.fetch_add(1); using SignatureComponents = typename EC::Meta::Matching::type; @@ -826,9 +848,21 @@ namespace EC delete data; }, fnDataAr[i]); } - threadPool->easyWakeAndWait(); + threadPool->easyStartAndWait(); } + // pop from idStack "call stack" + do { + { + std::lock_guard lock(idStackMutex); + if (idStack.back() == current_id) { + idStack.pop_back(); + break; + } + } + std::this_thread::sleep_for(std::chrono::microseconds(15)); + } while (true); + handleDeferredDeletions(); } @@ -878,6 +912,13 @@ namespace EC void* userData = nullptr, const bool useThreadPool = false) { + std::size_t current_id; + { + // push to idStack "call stack" + std::lock_guard lock(idStackMutex); + current_id = idStackCounter++; + idStack.push_back(current_id); + } deferringDeletions.fetch_add(1); using SignatureComponents = typename EC::Meta::Matching::type; @@ -951,9 +992,21 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->easyWakeAndWait(); + threadPool->easyStartAndWait(); } + // pop from idStack "call stack" + do { + { + std::lock_guard lock(idStackMutex); + if (idStack.back() == current_id) { + idStack.pop_back(); + break; + } + } + std::this_thread::sleep_for(std::chrono::microseconds(15)); + } while (true); + handleDeferredDeletions(); } @@ -1099,7 +1152,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->easyWakeAndWait(); + threadPool->easyStartAndWait(); } }))); @@ -1180,7 +1233,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->easyWakeAndWait(); + threadPool->easyStartAndWait(); } return matchingV; @@ -1483,6 +1536,13 @@ namespace EC void* userData = nullptr, const bool useThreadPool = false) { + std::size_t current_id; + { + // push to idStack "call stack" + std::lock_guard lock(idStackMutex); + current_id = idStackCounter++; + idStack.push_back(current_id); + } deferringDeletions.fetch_add(1); std::vector > multiMatchingEntities(SigList::size); @@ -1565,7 +1625,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->easyWakeAndWait(); + threadPool->easyStartAndWait(); } // call functions on matching entities @@ -1630,11 +1690,23 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->easyWakeAndWait(); + threadPool->easyStartAndWait(); } } ); + // pop from idStack "call stack" + do { + { + std::lock_guard lock(idStackMutex); + if (idStack.back() == current_id) { + idStack.pop_back(); + break; + } + } + std::this_thread::sleep_for(std::chrono::microseconds(15)); + } while (true); + handleDeferredDeletions(); } @@ -1694,6 +1766,13 @@ namespace EC void* userData = nullptr, const bool useThreadPool = false) { + std::size_t current_id; + { + // push to idStack "call stack" + std::lock_guard lock(idStackMutex); + current_id = idStackCounter++; + idStack.push_back(current_id); + } deferringDeletions.fetch_add(1); std::vector > multiMatchingEntities( SigList::size); @@ -1776,7 +1855,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->easyWakeAndWait(); + threadPool->easyStartAndWait(); } // call functions on matching entities @@ -1846,11 +1925,23 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->easyWakeAndWait(); + threadPool->easyStartAndWait(); } } ); + // pop from idStack "call stack" + do { + { + std::lock_guard lock(idStackMutex); + if (idStack.back() == current_id) { + idStack.pop_back(); + break; + } + } + std::this_thread::sleep_for(std::chrono::microseconds(15)); + } while (true); + handleDeferredDeletions(); } @@ -1879,6 +1970,13 @@ namespace EC void forMatchingSimple(ForMatchingFn fn, void *userData = nullptr, const bool useThreadPool = false) { + std::size_t current_id; + { + // push to idStack "call stack" + std::lock_guard lock(idStackMutex); + current_id = idStackCounter++; + idStack.push_back(current_id); + } deferringDeletions.fetch_add(1); const BitsetType signatureBitset = BitsetType::template generateBitset(); @@ -1934,9 +2032,21 @@ namespace EC delete data; }, fnDataAr[i]); } - threadPool->easyWakeAndWait(); + threadPool->easyStartAndWait(); } + // pop from idStack "call stack" + do { + { + std::lock_guard lock(idStackMutex); + if (idStack.back() == current_id) { + idStack.pop_back(); + break; + } + } + std::this_thread::sleep_for(std::chrono::microseconds(15)); + } while (true); + handleDeferredDeletions(); } @@ -1959,10 +2069,18 @@ namespace EC may not have as great of a speed-up. */ template - void forMatchingIterable(Iterable iterable, + void forMatchingIterable(Iterable iterable, ForMatchingFn fn, void* userData = nullptr, const bool useThreadPool = false) { + std::size_t current_id; + { + // push to idStack "call stack" + std::lock_guard lock(idStackMutex); + current_id = idStackCounter++; + idStack.push_back(current_id); + } + deferringDeletions.fetch_add(1); if(!useThreadPool || !threadPool) { bool isValid; @@ -2031,9 +2149,21 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->easyWakeAndWait(); + threadPool->easyStartAndWait(); } + // pop from idStack "call stack" + do { + { + std::lock_guard lock(idStackMutex); + if (idStack.back() == current_id) { + idStack.pop_back(); + break; + } + } + std::this_thread::sleep_for(std::chrono::microseconds(15)); + } while (true); + handleDeferredDeletions(); } }; diff --git a/src/EC/ThreadPool.hpp b/src/EC/ThreadPool.hpp index 3611954..74f16da 100644 --- a/src/EC/ThreadPool.hpp +++ b/src/EC/ThreadPool.hpp @@ -1,99 +1,52 @@ #ifndef EC_META_SYSTEM_THREADPOOL_HPP #define EC_META_SYSTEM_THREADPOOL_HPP -#include -#include -#include #include -#include -#include -#include -#include -#include #include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #ifndef NDEBUG -# include +#include #endif namespace EC { namespace Internal { - using TPFnType = std::function; - using TPTupleType = std::tuple; - using TPQueueType = std::queue; - - template - void thread_fn(std::atomic_bool *isAlive, - std::condition_variable *cv, - std::mutex *cvMutex, - Internal::TPQueueType *fnQueue, - std::mutex *queueMutex, - std::atomic_int *waitCount) { - bool hasFn = false; - Internal::TPTupleType fnTuple; - while(isAlive->load()) { - hasFn = false; - { - std::lock_guard lock(*queueMutex); - if(!fnQueue->empty()) { - fnTuple = fnQueue->front(); - fnQueue->pop(); - hasFn = true; - } - } - if(hasFn) { - std::get<0>(fnTuple)(std::get<1>(fnTuple)); - continue; - } - - waitCount->fetch_add(1); - { - std::unique_lock lock(*cvMutex); - cv->wait(lock); - } - waitCount->fetch_sub(1); - } - } -} // namespace Internal +using TPFnType = std::function; +using TPTupleType = std::tuple; +using TPQueueType = std::queue; +using ThreadPtr = std::unique_ptr; +using ThreadStackType = std::vector>; +using ThreadStacksType = std::deque; +using ThreadStacksMutexesT = std::deque; +using ThreadCountersT = std::deque; +using PointersT = + std::tuple; +} // namespace Internal /*! \brief Implementation of a Thread Pool. - Note that if SIZE is less than 2, then ThreadPool will not create threads and - run queued functions on the calling thread. + Note that if SIZE is less than 2, then ThreadPool will not create threads + and run queued functions on the calling thread. */ -template +template class ThreadPool { -public: - ThreadPool() { - waitCount.store(0); - extraThreadCount.store(0); - isAlive.store(true); - if(SIZE >= 2) { - for(unsigned int i = 0; i < SIZE; ++i) { - threads.emplace_back(Internal::thread_fn, - &isAlive, - &cv, - &cvMutex, - &fnQueue, - &queueMutex, - &waitCount); - threadsIDs.insert(threads.back().get_id()); - } - } - } + public: + ThreadPool() + : threadStacks{}, threadStackMutexes{}, fnQueue{}, queueMutex{} {} ~ThreadPool() { - if(SIZE >= 2) { - isAlive.store(false); - std::this_thread::sleep_for(std::chrono::milliseconds(20)); - cv.notify_all(); - for(auto &thread : threads) { - thread.join(); - } - std::this_thread::sleep_for(std::chrono::milliseconds(20)); + while (!isNotRunning()) { + std::this_thread::sleep_for(std::chrono::microseconds(30)); } } @@ -104,87 +57,97 @@ public: waiting threads which will start pulling functions from the queue to be called. */ - void queueFn(std::function&& fn, void *ud = nullptr) { + void queueFn(std::function &&fn, void *ud = nullptr) { std::lock_guard lock(queueMutex); fnQueue.emplace(std::make_tuple(fn, ud)); } - /*! - \brief Wakes waiting threads to start running queued functions. + void startThreads() { + if (MAXSIZE >= 2) { + checkStacks(); + auto pointers = newStackEntry(); + Internal::ThreadStackType *threadStack = std::get<0>(pointers); + std::mutex *threadStackMutex = std::get<1>(pointers); + std::atomic_uint *aCounter = std::get<2>(pointers); + for (unsigned int i = 0; i < MAXSIZE; ++i) { + std::thread *newThread = new std::thread( + [](Internal::ThreadStackType *threadStack, + std::mutex *threadStackMutex, + Internal::TPQueueType *fnQueue, std::mutex *queueMutex, + std::atomic_uint *initCount) { + // add id to idStack "call stack" + { + std::lock_guard lock(*threadStackMutex); + threadStack->push_back( + {Internal::ThreadPtr(nullptr), + std::this_thread::get_id()}); + } - If SIZE is less than 2, then this function call will block until all the - queued functions have been executed on the calling thread. + ++(*initCount); - If SIZE is 2 or greater, then this function will return immediately after - waking one or all threads, depending on the given boolean parameter. - */ - void wakeThreads(const bool wakeAll = true) { - if(SIZE >= 2) { - // wake threads to pull functions from queue and run them - if(wakeAll) { - cv.notify_all(); - } else { - cv.notify_one(); - } + // fetch queued fns and execute them + // fnTuples must live until end of function + std::list fnTuples; + do { + bool fnFound = false; + { + std::lock_guard lock(*queueMutex); + if (!fnQueue->empty()) { + fnTuples.emplace_back( + std::move(fnQueue->front())); + fnQueue->pop(); + fnFound = true; + } + } + if (fnFound) { + std::get<0>(fnTuples.back())( + std::get<1>(fnTuples.back())); + } else { + break; + } + } while (true); - // check if all threads are running a task, and spawn a new thread - // if this is the case - Internal::TPTupleType fnTuple; - bool hasFn = false; - if (waitCount.load(std::memory_order_relaxed) == 0) { - std::lock_guard queueLock(queueMutex); - if (!fnQueue.empty()) { - fnTuple = fnQueue.front(); - fnQueue.pop(); - hasFn = true; + // pop id from idStack "call stack" + do { + std::this_thread::sleep_for( + std::chrono::microseconds(15)); + if (initCount->load() != MAXSIZE) { + continue; + } + { + std::lock_guard lock( + *threadStackMutex); + if (std::get<1>(threadStack->back()) == + std::this_thread::get_id()) { + if (!std::get<0>(threadStack->back())) { + continue; + } + std::get<0>(threadStack->back())->detach(); + threadStack->pop_back(); + if (threadStack->empty()) { + initCount->store(0); + } + break; + } + } + } while (true); + }, + threadStack, threadStackMutex, &fnQueue, &queueMutex, + aCounter); + while (aCounter->load() != i + 1) { + std::this_thread::sleep_for(std::chrono::microseconds(15)); } + std::lock_guard stackLock(*threadStackMutex); + std::get<0>(threadStack->at(i)).reset(newThread); } - - if (hasFn) { -#ifndef NDEBUG - std::cout << "Spawning extra thread...\n"; -#endif - extraThreadCount.fetch_add(1); - std::thread newThread = std::thread( - [] (Internal::TPTupleType &&tuple, std::atomic_int *count) { - std::get<0>(tuple)(std::get<1>(tuple)); -#ifndef NDEBUG - std::cout << "Stopping extra thread...\n"; -#endif - count->fetch_sub(1); - }, - std::move(fnTuple), &extraThreadCount); - newThread.detach(); + while (aCounter->load() != MAXSIZE) { + std::this_thread::sleep_for(std::chrono::microseconds(15)); } } else { sequentiallyRunTasks(); } } - /*! - \brief Gets the number of waiting threads. - - If all threads are waiting, this should equal ThreadCount. - - If SIZE is less than 2, then this will always return 0. - */ - int getWaitCount() { - return waitCount.load(std::memory_order_relaxed); - } - - /*! - \brief Returns true if all threads are waiting. - - If SIZE is less than 2, then this will always return true. - */ - bool isAllThreadsWaiting() { - if(SIZE >= 2) { - return waitCount.load(std::memory_order_relaxed) == SIZE; - } else { - return true; - } - } - /*! \brief Returns true if the function queue is empty. */ @@ -196,43 +159,54 @@ public: /*! \brief Returns the ThreadCount that this class was created with. */ - constexpr unsigned int getThreadCount() { - return SIZE; - } + constexpr unsigned int getMaxThreadCount() { return MAXSIZE; } - /*! - \brief Wakes all threads and blocks until all queued tasks are finished. - - If SIZE is less than 2, then this function call will block until all the - queued functions have been executed on the calling thread. - - If SIZE is 2 or greater, then this function will block until all the - queued functions have been executed by the threads in the thread pool. - */ - void easyWakeAndWait() { - if(SIZE >= 2) { + void easyStartAndWait() { + if (MAXSIZE >= 2) { + startThreads(); do { - wakeThreads(); - std::this_thread::sleep_for(std::chrono::microseconds(150)); - } while(!isQueueEmpty() - || (threadsIDs.find(std::this_thread::get_id()) != threadsIDs.end() - && extraThreadCount.load(std::memory_order_relaxed) != 0)); -// } while(!isQueueEmpty() || !isAllThreadsWaiting()); + std::this_thread::sleep_for(std::chrono::microseconds(30)); + + bool isQueueEmpty = false; + { + std::lock_guard lock(queueMutex); + isQueueEmpty = fnQueue.empty(); + } + + if (isQueueEmpty) { + break; + } + } while (true); } else { sequentiallyRunTasks(); } } -private: - std::vector threads; - std::unordered_set threadsIDs; - std::atomic_bool isAlive; - std::condition_variable cv; - std::mutex cvMutex; + bool isNotRunning() { + std::lock_guard lock(dequesMutex); + auto tIter = threadStacks.begin(); + auto mIter = threadStackMutexes.begin(); + while (tIter != threadStacks.end() && + mIter != threadStackMutexes.end()) { + { + std::lock_guard lock(*mIter); + if (!tIter->empty()) { + return false; + } + } + ++tIter; + ++mIter; + } + return true; + } + + private: + Internal::ThreadStacksType threadStacks; + Internal::ThreadStacksMutexesT threadStackMutexes; Internal::TPQueueType fnQueue; std::mutex queueMutex; - std::atomic_int waitCount; - std::atomic_int extraThreadCount; + Internal::ThreadCountersT threadCounters; + std::mutex dequesMutex; void sequentiallyRunTasks() { // pull functions from queue and run them on current thread @@ -241,7 +215,7 @@ private: do { { std::lock_guard lock(queueMutex); - if(!fnQueue.empty()) { + if (!fnQueue.empty()) { hasFn = true; fnTuple = fnQueue.front(); fnQueue.pop(); @@ -249,14 +223,50 @@ private: hasFn = false; } } - if(hasFn) { + if (hasFn) { std::get<0>(fnTuple)(std::get<1>(fnTuple)); } - } while(hasFn); + } while (hasFn); } + void checkStacks() { + std::lock_guard lock(dequesMutex); + if (threadStacks.empty()) { + return; + } + + bool erased = false; + do { + erased = false; + { + std::lock_guard lock(threadStackMutexes.front()); + if (threadStacks.front().empty()) { + threadStacks.pop_front(); + threadCounters.pop_front(); + erased = true; + } + } + if (erased) { + threadStackMutexes.pop_front(); + } else { + break; + } + } while (!threadStacks.empty() && !threadStackMutexes.empty() && + !threadCounters.empty()); + } + + Internal::PointersT newStackEntry() { + std::lock_guard lock(dequesMutex); + threadStacks.emplace_back(); + threadStackMutexes.emplace_back(); + threadCounters.emplace_back(); + threadCounters.back().store(0); + + return {&threadStacks.back(), &threadStackMutexes.back(), + &threadCounters.back()}; + } }; -} // namespace EC +} // namespace EC #endif diff --git a/src/test/ECTest.cpp b/src/test/ECTest.cpp index 7852154..381f24f 100644 --- a/src/test/ECTest.cpp +++ b/src/test/ECTest.cpp @@ -1458,8 +1458,8 @@ TEST(EC, NestedThreadPoolTasks) { EXPECT_NE(outer_c->x, inner_c->x); EXPECT_NE(outer_c->y, inner_c->y); } - }, c, false); + }, c, true); }, &manager, true); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + //std::this_thread::sleep_for(std::chrono::milliseconds(100)); } diff --git a/src/test/ThreadPoolTest.cpp b/src/test/ThreadPoolTest.cpp index 9ebcbe1..ea2f161 100644 --- a/src/test/ThreadPoolTest.cpp +++ b/src/test/ThreadPoolTest.cpp @@ -16,22 +16,22 @@ TEST(ECThreadPool, OneThread) { p.queueFn(fn, &data); - p.wakeThreads(); + p.startThreads(); do { std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } while(!p.isQueueEmpty() || !p.isAllThreadsWaiting()); + } while(!p.isQueueEmpty() || !p.isNotRunning()); ASSERT_EQ(data.load(), 1); for(unsigned int i = 0; i < 10; ++i) { p.queueFn(fn, &data); } - p.wakeThreads(); + p.startThreads(); do { std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } while(!p.isQueueEmpty() || !p.isAllThreadsWaiting()); + } while(!p.isQueueEmpty() || !p.isNotRunning()); ASSERT_EQ(data.load(), 11); } @@ -47,22 +47,22 @@ TEST(ECThreadPool, Simple) { p.queueFn(fn, &data); - p.wakeThreads(); + p.startThreads(); do { std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } while(!p.isQueueEmpty() || !p.isAllThreadsWaiting()); + } while(!p.isQueueEmpty() || !p.isNotRunning()); ASSERT_EQ(data.load(), 1); for(unsigned int i = 0; i < 10; ++i) { p.queueFn(fn, &data); } - p.wakeThreads(); + p.startThreads(); do { std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } while(!p.isQueueEmpty() || !p.isAllThreadsWaiting()); + } while(!p.isQueueEmpty() || !p.isNotRunning()); ASSERT_EQ(data.load(), 11); } @@ -70,15 +70,15 @@ TEST(ECThreadPool, Simple) { TEST(ECThreadPool, QueryCount) { { OneThreadPool oneP; - ASSERT_EQ(1, oneP.getThreadCount()); + ASSERT_EQ(1, oneP.getMaxThreadCount()); } { ThreeThreadPool threeP; - ASSERT_EQ(3, threeP.getThreadCount()); + ASSERT_EQ(3, threeP.getMaxThreadCount()); } } -TEST(ECThreadPool, easyWakeAndWait) { +TEST(ECThreadPool, easyStartAndWait) { std::atomic_int data; data.store(0); { @@ -89,7 +89,7 @@ TEST(ECThreadPool, easyWakeAndWait) { atomicInt->fetch_add(1); }, &data); } - oneP.easyWakeAndWait(); + oneP.easyStartAndWait(); EXPECT_EQ(20, data.load()); } { @@ -100,7 +100,7 @@ TEST(ECThreadPool, easyWakeAndWait) { atomicInt->fetch_add(1); }, &data); } - threeP.easyWakeAndWait(); + threeP.easyStartAndWait(); EXPECT_EQ(40, data.load()); } }