More testing is probably required to make sure it works properly.
--- /dev/null
+---
+Language: Cpp
+# BasedOnStyle: Google
+AccessModifierOffset: -1
+AlignAfterOpenBracket: Align
+AlignArrayOfStructures: None
+AlignConsecutiveMacros: None
+AlignConsecutiveAssignments: None
+AlignConsecutiveBitFields: None
+AlignConsecutiveDeclarations: None
+AlignEscapedNewlines: Left
+AlignOperands: Align
+AlignTrailingComments: true
+AllowAllArgumentsOnNextLine: true
+AllowAllConstructorInitializersOnNextLine: true
+AllowAllParametersOfDeclarationOnNextLine: true
+AllowShortEnumsOnASingleLine: true
+AllowShortBlocksOnASingleLine: Never
+AllowShortCaseLabelsOnASingleLine: false
+AllowShortFunctionsOnASingleLine: All
+AllowShortLambdasOnASingleLine: All
+AllowShortIfStatementsOnASingleLine: WithoutElse
+AllowShortLoopsOnASingleLine: true
+AlwaysBreakAfterDefinitionReturnType: None
+AlwaysBreakAfterReturnType: None
+AlwaysBreakBeforeMultilineStrings: true
+AlwaysBreakTemplateDeclarations: Yes
+AttributeMacros:
+ - __capability
+BinPackArguments: true
+BinPackParameters: true
+BraceWrapping:
+ AfterCaseLabel: false
+ AfterClass: false
+ AfterControlStatement: Never
+ AfterEnum: false
+ AfterFunction: false
+ AfterNamespace: false
+ AfterObjCDeclaration: false
+ AfterStruct: false
+ AfterUnion: false
+ AfterExternBlock: false
+ BeforeCatch: false
+ BeforeElse: false
+ BeforeLambdaBody: false
+ BeforeWhile: false
+ IndentBraces: false
+ SplitEmptyFunction: true
+ SplitEmptyRecord: true
+ SplitEmptyNamespace: true
+BreakBeforeBinaryOperators: None
+BreakBeforeConceptDeclarations: true
+BreakBeforeBraces: Attach
+BreakBeforeInheritanceComma: false
+BreakInheritanceList: BeforeColon
+BreakBeforeTernaryOperators: true
+BreakConstructorInitializersBeforeComma: false
+BreakConstructorInitializers: BeforeColon
+BreakAfterJavaFieldAnnotations: false
+BreakStringLiterals: true
+ColumnLimit: 80
+CommentPragmas: '^ IWYU pragma:'
+CompactNamespaces: false
+ConstructorInitializerAllOnOneLineOrOnePerLine: true
+ConstructorInitializerIndentWidth: 4
+ContinuationIndentWidth: 4
+Cpp11BracedListStyle: true
+DeriveLineEnding: true
+DerivePointerAlignment: true
+DisableFormat: false
+EmptyLineAfterAccessModifier: Never
+EmptyLineBeforeAccessModifier: LogicalBlock
+ExperimentalAutoDetectBinPacking: false
+FixNamespaceComments: true
+ForEachMacros:
+ - foreach
+ - Q_FOREACH
+ - BOOST_FOREACH
+IfMacros:
+ - KJ_IF_MAYBE
+IncludeBlocks: Regroup
+IncludeCategories:
+ - Regex: '^<ext/.*\.h>'
+ Priority: 2
+ SortPriority: 0
+ CaseSensitive: false
+ - Regex: '^<.*\.h>'
+ Priority: 1
+ SortPriority: 0
+ CaseSensitive: false
+ - Regex: '^<.*'
+ Priority: 2
+ SortPriority: 0
+ CaseSensitive: false
+ - Regex: '.*'
+ Priority: 3
+ SortPriority: 0
+ CaseSensitive: false
+IncludeIsMainRegex: '([-_](test|unittest))?$'
+IncludeIsMainSourceRegex: ''
+IndentAccessModifiers: false
+IndentCaseLabels: true
+IndentCaseBlocks: false
+IndentGotoLabels: true
+IndentPPDirectives: None
+IndentExternBlock: AfterExternBlock
+IndentRequires: false
+IndentWidth: 4
+IndentWrappedFunctionNames: false
+InsertTrailingCommas: None
+JavaScriptQuotes: Leave
+JavaScriptWrapImports: true
+KeepEmptyLinesAtTheStartOfBlocks: false
+LambdaBodyIndentation: Signature
+MacroBlockBegin: ''
+MacroBlockEnd: ''
+MaxEmptyLinesToKeep: 1
+NamespaceIndentation: None
+ObjCBinPackProtocolList: Never
+ObjCBlockIndentWidth: 4
+ObjCBreakBeforeNestedBlockParam: true
+ObjCSpaceAfterProperty: false
+ObjCSpaceBeforeProtocolList: true
+PenaltyBreakAssignment: 2
+PenaltyBreakBeforeFirstCallParameter: 1
+PenaltyBreakComment: 300
+PenaltyBreakFirstLessLess: 120
+PenaltyBreakString: 1000
+PenaltyBreakTemplateDeclaration: 10
+PenaltyExcessCharacter: 1000000
+PenaltyReturnTypeOnItsOwnLine: 200
+PenaltyIndentedWhitespace: 0
+PointerAlignment: Left
+PPIndentWidth: -1
+RawStringFormats:
+ - Language: Cpp
+ Delimiters:
+ - cc
+ - CC
+ - cpp
+ - Cpp
+ - CPP
+ - 'c++'
+ - 'C++'
+ CanonicalDelimiter: ''
+ BasedOnStyle: google
+ - Language: TextProto
+ Delimiters:
+ - pb
+ - PB
+ - proto
+ - PROTO
+ EnclosingFunctions:
+ - EqualsProto
+ - EquivToProto
+ - PARSE_PARTIAL_TEXT_PROTO
+ - PARSE_TEST_PROTO
+ - PARSE_TEXT_PROTO
+ - ParseTextOrDie
+ - ParseTextProtoOrDie
+ - ParseTestProto
+ - ParsePartialTestProto
+ CanonicalDelimiter: pb
+ BasedOnStyle: google
+ReferenceAlignment: Pointer
+ReflowComments: true
+ShortNamespaceLines: 1
+SortIncludes: CaseSensitive
+SortJavaStaticImport: Before
+SortUsingDeclarations: true
+SpaceAfterCStyleCast: false
+SpaceAfterLogicalNot: false
+SpaceAfterTemplateKeyword: true
+SpaceBeforeAssignmentOperators: true
+SpaceBeforeCaseColon: false
+SpaceBeforeCpp11BracedList: false
+SpaceBeforeCtorInitializerColon: true
+SpaceBeforeInheritanceColon: true
+SpaceBeforeParens: ControlStatements
+SpaceAroundPointerQualifiers: Default
+SpaceBeforeRangeBasedForLoopColon: true
+SpaceInEmptyBlock: false
+SpaceInEmptyParentheses: false
+SpacesBeforeTrailingComments: 2
+SpacesInAngles: Never
+SpacesInConditionalStatement: false
+SpacesInContainerLiterals: true
+SpacesInCStyleCastParentheses: false
+SpacesInLineCommentPrefix:
+ Minimum: 1
+ Maximum: -1
+SpacesInParentheses: false
+SpacesInSquareBrackets: false
+SpaceBeforeSquareBrackets: false
+BitFieldColonSpacing: Both
+Standard: Auto
+StatementAttributeLikeMacros:
+ - Q_EMIT
+StatementMacros:
+ - Q_UNUSED
+ - QT_REQUIRE_VERSION
+TabWidth: 4
+UseCRLF: false
+UseTab: Never
+WhitespaceSensitiveMacros:
+ - STRINGIZE
+ - PP_STRINGIZE
+ - BOOST_PP_STRINGIZE
+ - NS_SWIFT_NAME
+ - CF_SWIFT_NAME
+...
+
#ifndef EC_MANAGER_HPP
#define EC_MANAGER_HPP
+#include <chrono>
#define EC_INIT_ENTITIES_SIZE 256
#define EC_GROW_SIZE_AMOUNT 256
std::vector<std::size_t> deferredDeletions;
std::mutex deferredDeletionsMutex;
+ std::vector<std::size_t> idStack;
+ std::size_t idStackCounter;
+ std::mutex idStackMutex;
+
public:
// section for "temporary" structures {{{
/// Temporary struct used internally by ThreadPool
The default capacity is set with macro EC_INIT_ENTITIES_SIZE,
and will grow by amounts of EC_GROW_SIZE_AMOUNT when needed.
*/
- Manager()
+ Manager() :
+ threadPool{},
+ idStackCounter(0)
{
resize(EC_INIT_ENTITIES_SIZE);
if(ThreadCount >= 2) {
deferringDeletions.store(0);
}
+ ~Manager() {
+ if (threadPool) {
+ while(!threadPool->isNotRunning()) {
+ std::this_thread::sleep_for(std::chrono::microseconds(30));
+ }
+ }
+ }
+
private:
void resize(std::size_t newCapacity)
{
void* userData = nullptr,
const bool useThreadPool = false)
{
+ std::size_t current_id;
+ {
+ // push to idStack "call stack"
+ std::lock_guard<std::mutex> lock(idStackMutex);
+ current_id = idStackCounter++;
+ idStack.push_back(current_id);
+ }
deferringDeletions.fetch_add(1);
using SignatureComponents =
typename EC::Meta::Matching<Signature, ComponentsList>::type;
delete data;
}, fnDataAr[i]);
}
- threadPool->easyWakeAndWait();
+ threadPool->easyStartAndWait();
}
+ // pop from idStack "call stack"
+ do {
+ {
+ std::lock_guard<std::mutex> lock(idStackMutex);
+ if (idStack.back() == current_id) {
+ idStack.pop_back();
+ break;
+ }
+ }
+ std::this_thread::sleep_for(std::chrono::microseconds(15));
+ } while (true);
+
handleDeferredDeletions();
}
void* userData = nullptr,
const bool useThreadPool = false)
{
+ std::size_t current_id;
+ {
+ // push to idStack "call stack"
+ std::lock_guard<std::mutex> lock(idStackMutex);
+ current_id = idStackCounter++;
+ idStack.push_back(current_id);
+ }
deferringDeletions.fetch_add(1);
using SignatureComponents =
typename EC::Meta::Matching<Signature, ComponentsList>::type;
}
}, &fnDataAr[i]);
}
- threadPool->easyWakeAndWait();
+ threadPool->easyStartAndWait();
}
+ // pop from idStack "call stack"
+ do {
+ {
+ std::lock_guard<std::mutex> lock(idStackMutex);
+ if (idStack.back() == current_id) {
+ idStack.pop_back();
+ break;
+ }
+ }
+ std::this_thread::sleep_for(std::chrono::microseconds(15));
+ } while (true);
+
handleDeferredDeletions();
}
}
}, &fnDataAr[i]);
}
- threadPool->easyWakeAndWait();
+ threadPool->easyStartAndWait();
}
})));
}
}, &fnDataAr[i]);
}
- threadPool->easyWakeAndWait();
+ threadPool->easyStartAndWait();
}
return matchingV;
void* userData = nullptr,
const bool useThreadPool = false)
{
+ std::size_t current_id;
+ {
+ // push to idStack "call stack"
+ std::lock_guard<std::mutex> lock(idStackMutex);
+ current_id = idStackCounter++;
+ idStack.push_back(current_id);
+ }
deferringDeletions.fetch_add(1);
std::vector<std::vector<std::size_t> >
multiMatchingEntities(SigList::size);
}
}, &fnDataAr[i]);
}
- threadPool->easyWakeAndWait();
+ threadPool->easyStartAndWait();
}
// call functions on matching entities
}
}, &fnDataAr[i]);
}
- threadPool->easyWakeAndWait();
+ threadPool->easyStartAndWait();
}
}
);
+ // pop from idStack "call stack"
+ do {
+ {
+ std::lock_guard<std::mutex> lock(idStackMutex);
+ if (idStack.back() == current_id) {
+ idStack.pop_back();
+ break;
+ }
+ }
+ std::this_thread::sleep_for(std::chrono::microseconds(15));
+ } while (true);
+
handleDeferredDeletions();
}
void* userData = nullptr,
const bool useThreadPool = false)
{
+ std::size_t current_id;
+ {
+ // push to idStack "call stack"
+ std::lock_guard<std::mutex> lock(idStackMutex);
+ current_id = idStackCounter++;
+ idStack.push_back(current_id);
+ }
deferringDeletions.fetch_add(1);
std::vector<std::vector<std::size_t> > multiMatchingEntities(
SigList::size);
}
}, &fnDataAr[i]);
}
- threadPool->easyWakeAndWait();
+ threadPool->easyStartAndWait();
}
// call functions on matching entities
}
}, &fnDataAr[i]);
}
- threadPool->easyWakeAndWait();
+ threadPool->easyStartAndWait();
}
}
);
+ // pop from idStack "call stack"
+ do {
+ {
+ std::lock_guard<std::mutex> lock(idStackMutex);
+ if (idStack.back() == current_id) {
+ idStack.pop_back();
+ break;
+ }
+ }
+ std::this_thread::sleep_for(std::chrono::microseconds(15));
+ } while (true);
+
handleDeferredDeletions();
}
void forMatchingSimple(ForMatchingFn fn,
void *userData = nullptr,
const bool useThreadPool = false) {
+ std::size_t current_id;
+ {
+ // push to idStack "call stack"
+ std::lock_guard<std::mutex> lock(idStackMutex);
+ current_id = idStackCounter++;
+ idStack.push_back(current_id);
+ }
deferringDeletions.fetch_add(1);
const BitsetType signatureBitset =
BitsetType::template generateBitset<Signature>();
delete data;
}, fnDataAr[i]);
}
- threadPool->easyWakeAndWait();
+ threadPool->easyStartAndWait();
}
+ // pop from idStack "call stack"
+ do {
+ {
+ std::lock_guard<std::mutex> lock(idStackMutex);
+ if (idStack.back() == current_id) {
+ idStack.pop_back();
+ break;
+ }
+ }
+ std::this_thread::sleep_for(std::chrono::microseconds(15));
+ } while (true);
+
handleDeferredDeletions();
}
may not have as great of a speed-up.
*/
template <typename Iterable>
- void forMatchingIterable(Iterable iterable,
+ void forMatchingIterable(Iterable iterable,
ForMatchingFn fn,
void* userData = nullptr,
const bool useThreadPool = false) {
+ std::size_t current_id;
+ {
+ // push to idStack "call stack"
+ std::lock_guard<std::mutex> lock(idStackMutex);
+ current_id = idStackCounter++;
+ idStack.push_back(current_id);
+ }
+
deferringDeletions.fetch_add(1);
if(!useThreadPool || !threadPool) {
bool isValid;
}
}, &fnDataAr[i]);
}
- threadPool->easyWakeAndWait();
+ threadPool->easyStartAndWait();
}
+ // pop from idStack "call stack"
+ do {
+ {
+ std::lock_guard<std::mutex> lock(idStackMutex);
+ if (idStack.back() == current_id) {
+ idStack.pop_back();
+ break;
+ }
+ }
+ std::this_thread::sleep_for(std::chrono::microseconds(15));
+ } while (true);
+
handleDeferredDeletions();
}
};
#ifndef EC_META_SYSTEM_THREADPOOL_HPP
#define EC_META_SYSTEM_THREADPOOL_HPP
-#include <type_traits>
-#include <vector>
-#include <thread>
#include <atomic>
+#include <chrono>
+#include <deque>
+#include <functional>
+#include <list>
+#include <memory>
#include <mutex>
-#include <condition_variable>
#include <queue>
-#include <functional>
+#include <thread>
#include <tuple>
-#include <chrono>
-#include <unordered_set>
+#include <vector>
#ifndef NDEBUG
-# include <iostream>
+#include <iostream>
#endif
namespace EC {
namespace Internal {
- using TPFnType = std::function<void(void*)>;
- using TPTupleType = std::tuple<TPFnType, void*>;
- using TPQueueType = std::queue<TPTupleType>;
-
- template <unsigned int SIZE>
- void thread_fn(std::atomic_bool *isAlive,
- std::condition_variable *cv,
- std::mutex *cvMutex,
- Internal::TPQueueType *fnQueue,
- std::mutex *queueMutex,
- std::atomic_int *waitCount) {
- bool hasFn = false;
- Internal::TPTupleType fnTuple;
- while(isAlive->load()) {
- hasFn = false;
- {
- std::lock_guard<std::mutex> lock(*queueMutex);
- if(!fnQueue->empty()) {
- fnTuple = fnQueue->front();
- fnQueue->pop();
- hasFn = true;
- }
- }
- if(hasFn) {
- std::get<0>(fnTuple)(std::get<1>(fnTuple));
- continue;
- }
-
- waitCount->fetch_add(1);
- {
- std::unique_lock<std::mutex> lock(*cvMutex);
- cv->wait(lock);
- }
- waitCount->fetch_sub(1);
- }
- }
-} // namespace Internal
+using TPFnType = std::function<void(void *)>;
+using TPTupleType = std::tuple<TPFnType, void *>;
+using TPQueueType = std::queue<TPTupleType>;
+using ThreadPtr = std::unique_ptr<std::thread>;
+using ThreadStackType = std::vector<std::tuple<ThreadPtr, std::thread::id>>;
+using ThreadStacksType = std::deque<ThreadStackType>;
+using ThreadStacksMutexesT = std::deque<std::mutex>;
+using ThreadCountersT = std::deque<std::atomic_uint>;
+using PointersT =
+ std::tuple<ThreadStackType *, std::mutex *, std::atomic_uint *>;
+} // namespace Internal
/*!
\brief Implementation of a Thread Pool.
- Note that if SIZE is less than 2, then ThreadPool will not create threads and
- run queued functions on the calling thread.
+ Note that if SIZE is less than 2, then ThreadPool will not create threads
+ and run queued functions on the calling thread.
*/
-template <unsigned int SIZE>
+template <unsigned int MAXSIZE>
class ThreadPool {
-public:
- ThreadPool() {
- waitCount.store(0);
- extraThreadCount.store(0);
- isAlive.store(true);
- if(SIZE >= 2) {
- for(unsigned int i = 0; i < SIZE; ++i) {
- threads.emplace_back(Internal::thread_fn<SIZE>,
- &isAlive,
- &cv,
- &cvMutex,
- &fnQueue,
- &queueMutex,
- &waitCount);
- threadsIDs.insert(threads.back().get_id());
- }
- }
- }
+ public:
+ ThreadPool()
+ : threadStacks{}, threadStackMutexes{}, fnQueue{}, queueMutex{} {}
~ThreadPool() {
- if(SIZE >= 2) {
- isAlive.store(false);
- std::this_thread::sleep_for(std::chrono::milliseconds(20));
- cv.notify_all();
- for(auto &thread : threads) {
- thread.join();
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(20));
+ while (!isNotRunning()) {
+ std::this_thread::sleep_for(std::chrono::microseconds(30));
}
}
waiting threads which will start pulling functions from the queue to be
called.
*/
- void queueFn(std::function<void(void*)>&& fn, void *ud = nullptr) {
+ void queueFn(std::function<void(void *)> &&fn, void *ud = nullptr) {
std::lock_guard<std::mutex> lock(queueMutex);
fnQueue.emplace(std::make_tuple(fn, ud));
}
- /*!
- \brief Wakes waiting threads to start running queued functions.
+ void startThreads() {
+ if (MAXSIZE >= 2) {
+ checkStacks();
+ auto pointers = newStackEntry();
+ Internal::ThreadStackType *threadStack = std::get<0>(pointers);
+ std::mutex *threadStackMutex = std::get<1>(pointers);
+ std::atomic_uint *aCounter = std::get<2>(pointers);
+ for (unsigned int i = 0; i < MAXSIZE; ++i) {
+ std::thread *newThread = new std::thread(
+ [](Internal::ThreadStackType *threadStack,
+ std::mutex *threadStackMutex,
+ Internal::TPQueueType *fnQueue, std::mutex *queueMutex,
+ std::atomic_uint *initCount) {
+ // add id to idStack "call stack"
+ {
+ std::lock_guard<std::mutex> lock(*threadStackMutex);
+ threadStack->push_back(
+ {Internal::ThreadPtr(nullptr),
+ std::this_thread::get_id()});
+ }
- If SIZE is less than 2, then this function call will block until all the
- queued functions have been executed on the calling thread.
+ ++(*initCount);
- If SIZE is 2 or greater, then this function will return immediately after
- waking one or all threads, depending on the given boolean parameter.
- */
- void wakeThreads(const bool wakeAll = true) {
- if(SIZE >= 2) {
- // wake threads to pull functions from queue and run them
- if(wakeAll) {
- cv.notify_all();
- } else {
- cv.notify_one();
- }
+ // fetch queued fns and execute them
+ // fnTuples must live until end of function
+ std::list<Internal::TPTupleType> fnTuples;
+ do {
+ bool fnFound = false;
+ {
+ std::lock_guard<std::mutex> lock(*queueMutex);
+ if (!fnQueue->empty()) {
+ fnTuples.emplace_back(
+ std::move(fnQueue->front()));
+ fnQueue->pop();
+ fnFound = true;
+ }
+ }
+ if (fnFound) {
+ std::get<0>(fnTuples.back())(
+ std::get<1>(fnTuples.back()));
+ } else {
+ break;
+ }
+ } while (true);
- // check if all threads are running a task, and spawn a new thread
- // if this is the case
- Internal::TPTupleType fnTuple;
- bool hasFn = false;
- if (waitCount.load(std::memory_order_relaxed) == 0) {
- std::lock_guard<std::mutex> queueLock(queueMutex);
- if (!fnQueue.empty()) {
- fnTuple = fnQueue.front();
- fnQueue.pop();
- hasFn = true;
+ // pop id from idStack "call stack"
+ do {
+ std::this_thread::sleep_for(
+ std::chrono::microseconds(15));
+ if (initCount->load() != MAXSIZE) {
+ continue;
+ }
+ {
+ std::lock_guard<std::mutex> lock(
+ *threadStackMutex);
+ if (std::get<1>(threadStack->back()) ==
+ std::this_thread::get_id()) {
+ if (!std::get<0>(threadStack->back())) {
+ continue;
+ }
+ std::get<0>(threadStack->back())->detach();
+ threadStack->pop_back();
+ if (threadStack->empty()) {
+ initCount->store(0);
+ }
+ break;
+ }
+ }
+ } while (true);
+ },
+ threadStack, threadStackMutex, &fnQueue, &queueMutex,
+ aCounter);
+ while (aCounter->load() != i + 1) {
+ std::this_thread::sleep_for(std::chrono::microseconds(15));
}
+ std::lock_guard<std::mutex> stackLock(*threadStackMutex);
+ std::get<0>(threadStack->at(i)).reset(newThread);
}
-
- if (hasFn) {
-#ifndef NDEBUG
- std::cout << "Spawning extra thread...\n";
-#endif
- extraThreadCount.fetch_add(1);
- std::thread newThread = std::thread(
- [] (Internal::TPTupleType &&tuple, std::atomic_int *count) {
- std::get<0>(tuple)(std::get<1>(tuple));
-#ifndef NDEBUG
- std::cout << "Stopping extra thread...\n";
-#endif
- count->fetch_sub(1);
- },
- std::move(fnTuple), &extraThreadCount);
- newThread.detach();
+ while (aCounter->load() != MAXSIZE) {
+ std::this_thread::sleep_for(std::chrono::microseconds(15));
}
} else {
sequentiallyRunTasks();
}
}
- /*!
- \brief Gets the number of waiting threads.
-
- If all threads are waiting, this should equal ThreadCount.
-
- If SIZE is less than 2, then this will always return 0.
- */
- int getWaitCount() {
- return waitCount.load(std::memory_order_relaxed);
- }
-
- /*!
- \brief Returns true if all threads are waiting.
-
- If SIZE is less than 2, then this will always return true.
- */
- bool isAllThreadsWaiting() {
- if(SIZE >= 2) {
- return waitCount.load(std::memory_order_relaxed) == SIZE;
- } else {
- return true;
- }
- }
-
/*!
\brief Returns true if the function queue is empty.
*/
/*!
\brief Returns the ThreadCount that this class was created with.
*/
- constexpr unsigned int getThreadCount() {
- return SIZE;
- }
+ constexpr unsigned int getMaxThreadCount() { return MAXSIZE; }
- /*!
- \brief Wakes all threads and blocks until all queued tasks are finished.
+ void easyStartAndWait() {
+ if (MAXSIZE >= 2) {
+ startThreads();
+ do {
+ std::this_thread::sleep_for(std::chrono::microseconds(30));
- If SIZE is less than 2, then this function call will block until all the
- queued functions have been executed on the calling thread.
+ bool isQueueEmpty = false;
+ {
+ std::lock_guard<std::mutex> lock(queueMutex);
+ isQueueEmpty = fnQueue.empty();
+ }
- If SIZE is 2 or greater, then this function will block until all the
- queued functions have been executed by the threads in the thread pool.
- */
- void easyWakeAndWait() {
- if(SIZE >= 2) {
- do {
- wakeThreads();
- std::this_thread::sleep_for(std::chrono::microseconds(150));
- } while(!isQueueEmpty()
- || (threadsIDs.find(std::this_thread::get_id()) != threadsIDs.end()
- && extraThreadCount.load(std::memory_order_relaxed) != 0));
-// } while(!isQueueEmpty() || !isAllThreadsWaiting());
+ if (isQueueEmpty) {
+ break;
+ }
+ } while (true);
} else {
sequentiallyRunTasks();
}
}
-private:
- std::vector<std::thread> threads;
- std::unordered_set<std::thread::id> threadsIDs;
- std::atomic_bool isAlive;
- std::condition_variable cv;
- std::mutex cvMutex;
+ bool isNotRunning() {
+ std::lock_guard<std::mutex> lock(dequesMutex);
+ auto tIter = threadStacks.begin();
+ auto mIter = threadStackMutexes.begin();
+ while (tIter != threadStacks.end() &&
+ mIter != threadStackMutexes.end()) {
+ {
+ std::lock_guard<std::mutex> lock(*mIter);
+ if (!tIter->empty()) {
+ return false;
+ }
+ }
+ ++tIter;
+ ++mIter;
+ }
+ return true;
+ }
+
+ private:
+ Internal::ThreadStacksType threadStacks;
+ Internal::ThreadStacksMutexesT threadStackMutexes;
Internal::TPQueueType fnQueue;
std::mutex queueMutex;
- std::atomic_int waitCount;
- std::atomic_int extraThreadCount;
+ Internal::ThreadCountersT threadCounters;
+ std::mutex dequesMutex;
void sequentiallyRunTasks() {
// pull functions from queue and run them on current thread
do {
{
std::lock_guard<std::mutex> lock(queueMutex);
- if(!fnQueue.empty()) {
+ if (!fnQueue.empty()) {
hasFn = true;
fnTuple = fnQueue.front();
fnQueue.pop();
hasFn = false;
}
}
- if(hasFn) {
+ if (hasFn) {
std::get<0>(fnTuple)(std::get<1>(fnTuple));
}
- } while(hasFn);
+ } while (hasFn);
}
+ void checkStacks() {
+ std::lock_guard<std::mutex> lock(dequesMutex);
+ if (threadStacks.empty()) {
+ return;
+ }
+
+ bool erased = false;
+ do {
+ erased = false;
+ {
+ std::lock_guard<std::mutex> lock(threadStackMutexes.front());
+ if (threadStacks.front().empty()) {
+ threadStacks.pop_front();
+ threadCounters.pop_front();
+ erased = true;
+ }
+ }
+ if (erased) {
+ threadStackMutexes.pop_front();
+ } else {
+ break;
+ }
+ } while (!threadStacks.empty() && !threadStackMutexes.empty() &&
+ !threadCounters.empty());
+ }
+
+ Internal::PointersT newStackEntry() {
+ std::lock_guard<std::mutex> lock(dequesMutex);
+ threadStacks.emplace_back();
+ threadStackMutexes.emplace_back();
+ threadCounters.emplace_back();
+ threadCounters.back().store(0);
+
+ return {&threadStacks.back(), &threadStackMutexes.back(),
+ &threadCounters.back()};
+ }
};
-} // namespace EC
+} // namespace EC
#endif
EXPECT_NE(outer_c->x, inner_c->x);
EXPECT_NE(outer_c->y, inner_c->y);
}
- }, c, false);
+ }, c, true);
}, &manager, true);
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ //std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
p.queueFn(fn, &data);
- p.wakeThreads();
+ p.startThreads();
do {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
- } while(!p.isQueueEmpty() || !p.isAllThreadsWaiting());
+ } while(!p.isQueueEmpty() || !p.isNotRunning());
ASSERT_EQ(data.load(), 1);
for(unsigned int i = 0; i < 10; ++i) {
p.queueFn(fn, &data);
}
- p.wakeThreads();
+ p.startThreads();
do {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
- } while(!p.isQueueEmpty() || !p.isAllThreadsWaiting());
+ } while(!p.isQueueEmpty() || !p.isNotRunning());
ASSERT_EQ(data.load(), 11);
}
p.queueFn(fn, &data);
- p.wakeThreads();
+ p.startThreads();
do {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
- } while(!p.isQueueEmpty() || !p.isAllThreadsWaiting());
+ } while(!p.isQueueEmpty() || !p.isNotRunning());
ASSERT_EQ(data.load(), 1);
for(unsigned int i = 0; i < 10; ++i) {
p.queueFn(fn, &data);
}
- p.wakeThreads();
+ p.startThreads();
do {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
- } while(!p.isQueueEmpty() || !p.isAllThreadsWaiting());
+ } while(!p.isQueueEmpty() || !p.isNotRunning());
ASSERT_EQ(data.load(), 11);
}
TEST(ECThreadPool, QueryCount) {
{
OneThreadPool oneP;
- ASSERT_EQ(1, oneP.getThreadCount());
+ ASSERT_EQ(1, oneP.getMaxThreadCount());
}
{
ThreeThreadPool threeP;
- ASSERT_EQ(3, threeP.getThreadCount());
+ ASSERT_EQ(3, threeP.getMaxThreadCount());
}
}
-TEST(ECThreadPool, easyWakeAndWait) {
+TEST(ECThreadPool, easyStartAndWait) {
std::atomic_int data;
data.store(0);
{
atomicInt->fetch_add(1);
}, &data);
}
- oneP.easyWakeAndWait();
+ oneP.easyStartAndWait();
EXPECT_EQ(20, data.load());
}
{
atomicInt->fetch_add(1);
}, &data);
}
- threeP.easyWakeAndWait();
+ threeP.easyStartAndWait();
EXPECT_EQ(40, data.load());
}
}