Impl nested threaded calls

More testing is probably required to make sure it works properly.
This commit is contained in:
Stephen Seo 2022-06-15 21:15:34 +09:00
parent 3286aa5a74
commit 292bffb636
5 changed files with 553 additions and 201 deletions

212
.clang-format Normal file
View file

@ -0,0 +1,212 @@
---
Language: Cpp
# BasedOnStyle: Google
AccessModifierOffset: -1
AlignAfterOpenBracket: Align
AlignArrayOfStructures: None
AlignConsecutiveMacros: None
AlignConsecutiveAssignments: None
AlignConsecutiveBitFields: None
AlignConsecutiveDeclarations: None
AlignEscapedNewlines: Left
AlignOperands: Align
AlignTrailingComments: true
AllowAllArgumentsOnNextLine: true
AllowAllConstructorInitializersOnNextLine: true
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortEnumsOnASingleLine: true
AllowShortBlocksOnASingleLine: Never
AllowShortCaseLabelsOnASingleLine: false
AllowShortFunctionsOnASingleLine: All
AllowShortLambdasOnASingleLine: All
AllowShortIfStatementsOnASingleLine: WithoutElse
AllowShortLoopsOnASingleLine: true
AlwaysBreakAfterDefinitionReturnType: None
AlwaysBreakAfterReturnType: None
AlwaysBreakBeforeMultilineStrings: true
AlwaysBreakTemplateDeclarations: Yes
AttributeMacros:
- __capability
BinPackArguments: true
BinPackParameters: true
BraceWrapping:
AfterCaseLabel: false
AfterClass: false
AfterControlStatement: Never
AfterEnum: false
AfterFunction: false
AfterNamespace: false
AfterObjCDeclaration: false
AfterStruct: false
AfterUnion: false
AfterExternBlock: false
BeforeCatch: false
BeforeElse: false
BeforeLambdaBody: false
BeforeWhile: false
IndentBraces: false
SplitEmptyFunction: true
SplitEmptyRecord: true
SplitEmptyNamespace: true
BreakBeforeBinaryOperators: None
BreakBeforeConceptDeclarations: true
BreakBeforeBraces: Attach
BreakBeforeInheritanceComma: false
BreakInheritanceList: BeforeColon
BreakBeforeTernaryOperators: true
BreakConstructorInitializersBeforeComma: false
BreakConstructorInitializers: BeforeColon
BreakAfterJavaFieldAnnotations: false
BreakStringLiterals: true
ColumnLimit: 80
CommentPragmas: '^ IWYU pragma:'
CompactNamespaces: false
ConstructorInitializerAllOnOneLineOrOnePerLine: true
ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
Cpp11BracedListStyle: true
DeriveLineEnding: true
DerivePointerAlignment: true
DisableFormat: false
EmptyLineAfterAccessModifier: Never
EmptyLineBeforeAccessModifier: LogicalBlock
ExperimentalAutoDetectBinPacking: false
FixNamespaceComments: true
ForEachMacros:
- foreach
- Q_FOREACH
- BOOST_FOREACH
IfMacros:
- KJ_IF_MAYBE
IncludeBlocks: Regroup
IncludeCategories:
- Regex: '^<ext/.*\.h>'
Priority: 2
SortPriority: 0
CaseSensitive: false
- Regex: '^<.*\.h>'
Priority: 1
SortPriority: 0
CaseSensitive: false
- Regex: '^<.*'
Priority: 2
SortPriority: 0
CaseSensitive: false
- Regex: '.*'
Priority: 3
SortPriority: 0
CaseSensitive: false
IncludeIsMainRegex: '([-_](test|unittest))?$'
IncludeIsMainSourceRegex: ''
IndentAccessModifiers: false
IndentCaseLabels: true
IndentCaseBlocks: false
IndentGotoLabels: true
IndentPPDirectives: None
IndentExternBlock: AfterExternBlock
IndentRequires: false
IndentWidth: 4
IndentWrappedFunctionNames: false
InsertTrailingCommas: None
JavaScriptQuotes: Leave
JavaScriptWrapImports: true
KeepEmptyLinesAtTheStartOfBlocks: false
LambdaBodyIndentation: Signature
MacroBlockBegin: ''
MacroBlockEnd: ''
MaxEmptyLinesToKeep: 1
NamespaceIndentation: None
ObjCBinPackProtocolList: Never
ObjCBlockIndentWidth: 4
ObjCBreakBeforeNestedBlockParam: true
ObjCSpaceAfterProperty: false
ObjCSpaceBeforeProtocolList: true
PenaltyBreakAssignment: 2
PenaltyBreakBeforeFirstCallParameter: 1
PenaltyBreakComment: 300
PenaltyBreakFirstLessLess: 120
PenaltyBreakString: 1000
PenaltyBreakTemplateDeclaration: 10
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 200
PenaltyIndentedWhitespace: 0
PointerAlignment: Left
PPIndentWidth: -1
RawStringFormats:
- Language: Cpp
Delimiters:
- cc
- CC
- cpp
- Cpp
- CPP
- 'c++'
- 'C++'
CanonicalDelimiter: ''
BasedOnStyle: google
- Language: TextProto
Delimiters:
- pb
- PB
- proto
- PROTO
EnclosingFunctions:
- EqualsProto
- EquivToProto
- PARSE_PARTIAL_TEXT_PROTO
- PARSE_TEST_PROTO
- PARSE_TEXT_PROTO
- ParseTextOrDie
- ParseTextProtoOrDie
- ParseTestProto
- ParsePartialTestProto
CanonicalDelimiter: pb
BasedOnStyle: google
ReferenceAlignment: Pointer
ReflowComments: true
ShortNamespaceLines: 1
SortIncludes: CaseSensitive
SortJavaStaticImport: Before
SortUsingDeclarations: true
SpaceAfterCStyleCast: false
SpaceAfterLogicalNot: false
SpaceAfterTemplateKeyword: true
SpaceBeforeAssignmentOperators: true
SpaceBeforeCaseColon: false
SpaceBeforeCpp11BracedList: false
SpaceBeforeCtorInitializerColon: true
SpaceBeforeInheritanceColon: true
SpaceBeforeParens: ControlStatements
SpaceAroundPointerQualifiers: Default
SpaceBeforeRangeBasedForLoopColon: true
SpaceInEmptyBlock: false
SpaceInEmptyParentheses: false
SpacesBeforeTrailingComments: 2
SpacesInAngles: Never
SpacesInConditionalStatement: false
SpacesInContainerLiterals: true
SpacesInCStyleCastParentheses: false
SpacesInLineCommentPrefix:
Minimum: 1
Maximum: -1
SpacesInParentheses: false
SpacesInSquareBrackets: false
SpaceBeforeSquareBrackets: false
BitFieldColonSpacing: Both
Standard: Auto
StatementAttributeLikeMacros:
- Q_EMIT
StatementMacros:
- Q_UNUSED
- QT_REQUIRE_VERSION
TabWidth: 4
UseCRLF: false
UseTab: Never
WhitespaceSensitiveMacros:
- STRINGIZE
- PP_STRINGIZE
- BOOST_PP_STRINGIZE
- NS_SWIFT_NAME
- CF_SWIFT_NAME
...

View file

@ -7,6 +7,7 @@
#ifndef EC_MANAGER_HPP #ifndef EC_MANAGER_HPP
#define EC_MANAGER_HPP #define EC_MANAGER_HPP
#include <chrono>
#define EC_INIT_ENTITIES_SIZE 256 #define EC_INIT_ENTITIES_SIZE 256
#define EC_GROW_SIZE_AMOUNT 256 #define EC_GROW_SIZE_AMOUNT 256
@ -107,6 +108,10 @@ namespace EC
std::vector<std::size_t> deferredDeletions; std::vector<std::size_t> deferredDeletions;
std::mutex deferredDeletionsMutex; std::mutex deferredDeletionsMutex;
std::vector<std::size_t> idStack;
std::size_t idStackCounter;
std::mutex idStackMutex;
public: public:
// section for "temporary" structures {{{ // section for "temporary" structures {{{
/// Temporary struct used internally by ThreadPool /// Temporary struct used internally by ThreadPool
@ -196,7 +201,9 @@ namespace EC
The default capacity is set with macro EC_INIT_ENTITIES_SIZE, The default capacity is set with macro EC_INIT_ENTITIES_SIZE,
and will grow by amounts of EC_GROW_SIZE_AMOUNT when needed. and will grow by amounts of EC_GROW_SIZE_AMOUNT when needed.
*/ */
Manager() Manager() :
threadPool{},
idStackCounter(0)
{ {
resize(EC_INIT_ENTITIES_SIZE); resize(EC_INIT_ENTITIES_SIZE);
if(ThreadCount >= 2) { if(ThreadCount >= 2) {
@ -206,6 +213,14 @@ namespace EC
deferringDeletions.store(0); deferringDeletions.store(0);
} }
~Manager() {
if (threadPool) {
while(!threadPool->isNotRunning()) {
std::this_thread::sleep_for(std::chrono::microseconds(30));
}
}
}
private: private:
void resize(std::size_t newCapacity) void resize(std::size_t newCapacity)
{ {
@ -750,6 +765,13 @@ namespace EC
void* userData = nullptr, void* userData = nullptr,
const bool useThreadPool = false) const bool useThreadPool = false)
{ {
std::size_t current_id;
{
// push to idStack "call stack"
std::lock_guard<std::mutex> lock(idStackMutex);
current_id = idStackCounter++;
idStack.push_back(current_id);
}
deferringDeletions.fetch_add(1); deferringDeletions.fetch_add(1);
using SignatureComponents = using SignatureComponents =
typename EC::Meta::Matching<Signature, ComponentsList>::type; typename EC::Meta::Matching<Signature, ComponentsList>::type;
@ -826,9 +848,21 @@ namespace EC
delete data; delete data;
}, fnDataAr[i]); }, fnDataAr[i]);
} }
threadPool->easyWakeAndWait(); threadPool->easyStartAndWait();
} }
// pop from idStack "call stack"
do {
{
std::lock_guard<std::mutex> lock(idStackMutex);
if (idStack.back() == current_id) {
idStack.pop_back();
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(15));
} while (true);
handleDeferredDeletions(); handleDeferredDeletions();
} }
@ -878,6 +912,13 @@ namespace EC
void* userData = nullptr, void* userData = nullptr,
const bool useThreadPool = false) const bool useThreadPool = false)
{ {
std::size_t current_id;
{
// push to idStack "call stack"
std::lock_guard<std::mutex> lock(idStackMutex);
current_id = idStackCounter++;
idStack.push_back(current_id);
}
deferringDeletions.fetch_add(1); deferringDeletions.fetch_add(1);
using SignatureComponents = using SignatureComponents =
typename EC::Meta::Matching<Signature, ComponentsList>::type; typename EC::Meta::Matching<Signature, ComponentsList>::type;
@ -951,9 +992,21 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->easyWakeAndWait(); threadPool->easyStartAndWait();
} }
// pop from idStack "call stack"
do {
{
std::lock_guard<std::mutex> lock(idStackMutex);
if (idStack.back() == current_id) {
idStack.pop_back();
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(15));
} while (true);
handleDeferredDeletions(); handleDeferredDeletions();
} }
@ -1099,7 +1152,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->easyWakeAndWait(); threadPool->easyStartAndWait();
} }
}))); })));
@ -1180,7 +1233,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->easyWakeAndWait(); threadPool->easyStartAndWait();
} }
return matchingV; return matchingV;
@ -1483,6 +1536,13 @@ namespace EC
void* userData = nullptr, void* userData = nullptr,
const bool useThreadPool = false) const bool useThreadPool = false)
{ {
std::size_t current_id;
{
// push to idStack "call stack"
std::lock_guard<std::mutex> lock(idStackMutex);
current_id = idStackCounter++;
idStack.push_back(current_id);
}
deferringDeletions.fetch_add(1); deferringDeletions.fetch_add(1);
std::vector<std::vector<std::size_t> > std::vector<std::vector<std::size_t> >
multiMatchingEntities(SigList::size); multiMatchingEntities(SigList::size);
@ -1565,7 +1625,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->easyWakeAndWait(); threadPool->easyStartAndWait();
} }
// call functions on matching entities // call functions on matching entities
@ -1630,11 +1690,23 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->easyWakeAndWait(); threadPool->easyStartAndWait();
} }
} }
); );
// pop from idStack "call stack"
do {
{
std::lock_guard<std::mutex> lock(idStackMutex);
if (idStack.back() == current_id) {
idStack.pop_back();
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(15));
} while (true);
handleDeferredDeletions(); handleDeferredDeletions();
} }
@ -1694,6 +1766,13 @@ namespace EC
void* userData = nullptr, void* userData = nullptr,
const bool useThreadPool = false) const bool useThreadPool = false)
{ {
std::size_t current_id;
{
// push to idStack "call stack"
std::lock_guard<std::mutex> lock(idStackMutex);
current_id = idStackCounter++;
idStack.push_back(current_id);
}
deferringDeletions.fetch_add(1); deferringDeletions.fetch_add(1);
std::vector<std::vector<std::size_t> > multiMatchingEntities( std::vector<std::vector<std::size_t> > multiMatchingEntities(
SigList::size); SigList::size);
@ -1776,7 +1855,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->easyWakeAndWait(); threadPool->easyStartAndWait();
} }
// call functions on matching entities // call functions on matching entities
@ -1846,11 +1925,23 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->easyWakeAndWait(); threadPool->easyStartAndWait();
} }
} }
); );
// pop from idStack "call stack"
do {
{
std::lock_guard<std::mutex> lock(idStackMutex);
if (idStack.back() == current_id) {
idStack.pop_back();
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(15));
} while (true);
handleDeferredDeletions(); handleDeferredDeletions();
} }
@ -1879,6 +1970,13 @@ namespace EC
void forMatchingSimple(ForMatchingFn fn, void forMatchingSimple(ForMatchingFn fn,
void *userData = nullptr, void *userData = nullptr,
const bool useThreadPool = false) { const bool useThreadPool = false) {
std::size_t current_id;
{
// push to idStack "call stack"
std::lock_guard<std::mutex> lock(idStackMutex);
current_id = idStackCounter++;
idStack.push_back(current_id);
}
deferringDeletions.fetch_add(1); deferringDeletions.fetch_add(1);
const BitsetType signatureBitset = const BitsetType signatureBitset =
BitsetType::template generateBitset<Signature>(); BitsetType::template generateBitset<Signature>();
@ -1934,9 +2032,21 @@ namespace EC
delete data; delete data;
}, fnDataAr[i]); }, fnDataAr[i]);
} }
threadPool->easyWakeAndWait(); threadPool->easyStartAndWait();
} }
// pop from idStack "call stack"
do {
{
std::lock_guard<std::mutex> lock(idStackMutex);
if (idStack.back() == current_id) {
idStack.pop_back();
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(15));
} while (true);
handleDeferredDeletions(); handleDeferredDeletions();
} }
@ -1963,6 +2073,14 @@ namespace EC
ForMatchingFn fn, ForMatchingFn fn,
void* userData = nullptr, void* userData = nullptr,
const bool useThreadPool = false) { const bool useThreadPool = false) {
std::size_t current_id;
{
// push to idStack "call stack"
std::lock_guard<std::mutex> lock(idStackMutex);
current_id = idStackCounter++;
idStack.push_back(current_id);
}
deferringDeletions.fetch_add(1); deferringDeletions.fetch_add(1);
if(!useThreadPool || !threadPool) { if(!useThreadPool || !threadPool) {
bool isValid; bool isValid;
@ -2031,9 +2149,21 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->easyWakeAndWait(); threadPool->easyStartAndWait();
} }
// pop from idStack "call stack"
do {
{
std::lock_guard<std::mutex> lock(idStackMutex);
if (idStack.back() == current_id) {
idStack.pop_back();
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(15));
} while (true);
handleDeferredDeletions(); handleDeferredDeletions();
} }
}; };

View file

@ -1,99 +1,52 @@
#ifndef EC_META_SYSTEM_THREADPOOL_HPP #ifndef EC_META_SYSTEM_THREADPOOL_HPP
#define EC_META_SYSTEM_THREADPOOL_HPP #define EC_META_SYSTEM_THREADPOOL_HPP
#include <type_traits>
#include <vector>
#include <thread>
#include <atomic> #include <atomic>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <tuple>
#include <chrono> #include <chrono>
#include <unordered_set> #include <deque>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <tuple>
#include <vector>
#ifndef NDEBUG #ifndef NDEBUG
# include <iostream> #include <iostream>
#endif #endif
namespace EC { namespace EC {
namespace Internal { namespace Internal {
using TPFnType = std::function<void(void*)>; using TPFnType = std::function<void(void *)>;
using TPTupleType = std::tuple<TPFnType, void*>; using TPTupleType = std::tuple<TPFnType, void *>;
using TPQueueType = std::queue<TPTupleType>; using TPQueueType = std::queue<TPTupleType>;
using ThreadPtr = std::unique_ptr<std::thread>;
template <unsigned int SIZE> using ThreadStackType = std::vector<std::tuple<ThreadPtr, std::thread::id>>;
void thread_fn(std::atomic_bool *isAlive, using ThreadStacksType = std::deque<ThreadStackType>;
std::condition_variable *cv, using ThreadStacksMutexesT = std::deque<std::mutex>;
std::mutex *cvMutex, using ThreadCountersT = std::deque<std::atomic_uint>;
Internal::TPQueueType *fnQueue, using PointersT =
std::mutex *queueMutex, std::tuple<ThreadStackType *, std::mutex *, std::atomic_uint *>;
std::atomic_int *waitCount) { } // namespace Internal
bool hasFn = false;
Internal::TPTupleType fnTuple;
while(isAlive->load()) {
hasFn = false;
{
std::lock_guard<std::mutex> lock(*queueMutex);
if(!fnQueue->empty()) {
fnTuple = fnQueue->front();
fnQueue->pop();
hasFn = true;
}
}
if(hasFn) {
std::get<0>(fnTuple)(std::get<1>(fnTuple));
continue;
}
waitCount->fetch_add(1);
{
std::unique_lock<std::mutex> lock(*cvMutex);
cv->wait(lock);
}
waitCount->fetch_sub(1);
}
}
} // namespace Internal
/*! /*!
\brief Implementation of a Thread Pool. \brief Implementation of a Thread Pool.
Note that if SIZE is less than 2, then ThreadPool will not create threads and Note that if SIZE is less than 2, then ThreadPool will not create threads
run queued functions on the calling thread. and run queued functions on the calling thread.
*/ */
template <unsigned int SIZE> template <unsigned int MAXSIZE>
class ThreadPool { class ThreadPool {
public: public:
ThreadPool() { ThreadPool()
waitCount.store(0); : threadStacks{}, threadStackMutexes{}, fnQueue{}, queueMutex{} {}
extraThreadCount.store(0);
isAlive.store(true);
if(SIZE >= 2) {
for(unsigned int i = 0; i < SIZE; ++i) {
threads.emplace_back(Internal::thread_fn<SIZE>,
&isAlive,
&cv,
&cvMutex,
&fnQueue,
&queueMutex,
&waitCount);
threadsIDs.insert(threads.back().get_id());
}
}
}
~ThreadPool() { ~ThreadPool() {
if(SIZE >= 2) { while (!isNotRunning()) {
isAlive.store(false); std::this_thread::sleep_for(std::chrono::microseconds(30));
std::this_thread::sleep_for(std::chrono::milliseconds(20));
cv.notify_all();
for(auto &thread : threads) {
thread.join();
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
} }
} }
@ -104,87 +57,97 @@ public:
waiting threads which will start pulling functions from the queue to be waiting threads which will start pulling functions from the queue to be
called. called.
*/ */
void queueFn(std::function<void(void*)>&& fn, void *ud = nullptr) { void queueFn(std::function<void(void *)> &&fn, void *ud = nullptr) {
std::lock_guard<std::mutex> lock(queueMutex); std::lock_guard<std::mutex> lock(queueMutex);
fnQueue.emplace(std::make_tuple(fn, ud)); fnQueue.emplace(std::make_tuple(fn, ud));
} }
/*! void startThreads() {
\brief Wakes waiting threads to start running queued functions. if (MAXSIZE >= 2) {
checkStacks();
auto pointers = newStackEntry();
Internal::ThreadStackType *threadStack = std::get<0>(pointers);
std::mutex *threadStackMutex = std::get<1>(pointers);
std::atomic_uint *aCounter = std::get<2>(pointers);
for (unsigned int i = 0; i < MAXSIZE; ++i) {
std::thread *newThread = new std::thread(
[](Internal::ThreadStackType *threadStack,
std::mutex *threadStackMutex,
Internal::TPQueueType *fnQueue, std::mutex *queueMutex,
std::atomic_uint *initCount) {
// add id to idStack "call stack"
{
std::lock_guard<std::mutex> lock(*threadStackMutex);
threadStack->push_back(
{Internal::ThreadPtr(nullptr),
std::this_thread::get_id()});
}
If SIZE is less than 2, then this function call will block until all the ++(*initCount);
queued functions have been executed on the calling thread.
If SIZE is 2 or greater, then this function will return immediately after // fetch queued fns and execute them
waking one or all threads, depending on the given boolean parameter. // fnTuples must live until end of function
*/ std::list<Internal::TPTupleType> fnTuples;
void wakeThreads(const bool wakeAll = true) { do {
if(SIZE >= 2) { bool fnFound = false;
// wake threads to pull functions from queue and run them {
if(wakeAll) { std::lock_guard<std::mutex> lock(*queueMutex);
cv.notify_all(); if (!fnQueue->empty()) {
} else { fnTuples.emplace_back(
cv.notify_one(); std::move(fnQueue->front()));
} fnQueue->pop();
fnFound = true;
}
}
if (fnFound) {
std::get<0>(fnTuples.back())(
std::get<1>(fnTuples.back()));
} else {
break;
}
} while (true);
// check if all threads are running a task, and spawn a new thread // pop id from idStack "call stack"
// if this is the case do {
Internal::TPTupleType fnTuple; std::this_thread::sleep_for(
bool hasFn = false; std::chrono::microseconds(15));
if (waitCount.load(std::memory_order_relaxed) == 0) { if (initCount->load() != MAXSIZE) {
std::lock_guard<std::mutex> queueLock(queueMutex); continue;
if (!fnQueue.empty()) { }
fnTuple = fnQueue.front(); {
fnQueue.pop(); std::lock_guard<std::mutex> lock(
hasFn = true; *threadStackMutex);
if (std::get<1>(threadStack->back()) ==
std::this_thread::get_id()) {
if (!std::get<0>(threadStack->back())) {
continue;
}
std::get<0>(threadStack->back())->detach();
threadStack->pop_back();
if (threadStack->empty()) {
initCount->store(0);
}
break;
}
}
} while (true);
},
threadStack, threadStackMutex, &fnQueue, &queueMutex,
aCounter);
while (aCounter->load() != i + 1) {
std::this_thread::sleep_for(std::chrono::microseconds(15));
} }
std::lock_guard<std::mutex> stackLock(*threadStackMutex);
std::get<0>(threadStack->at(i)).reset(newThread);
} }
while (aCounter->load() != MAXSIZE) {
if (hasFn) { std::this_thread::sleep_for(std::chrono::microseconds(15));
#ifndef NDEBUG
std::cout << "Spawning extra thread...\n";
#endif
extraThreadCount.fetch_add(1);
std::thread newThread = std::thread(
[] (Internal::TPTupleType &&tuple, std::atomic_int *count) {
std::get<0>(tuple)(std::get<1>(tuple));
#ifndef NDEBUG
std::cout << "Stopping extra thread...\n";
#endif
count->fetch_sub(1);
},
std::move(fnTuple), &extraThreadCount);
newThread.detach();
} }
} else { } else {
sequentiallyRunTasks(); sequentiallyRunTasks();
} }
} }
/*!
\brief Gets the number of waiting threads.
If all threads are waiting, this should equal ThreadCount.
If SIZE is less than 2, then this will always return 0.
*/
int getWaitCount() {
return waitCount.load(std::memory_order_relaxed);
}
/*!
\brief Returns true if all threads are waiting.
If SIZE is less than 2, then this will always return true.
*/
bool isAllThreadsWaiting() {
if(SIZE >= 2) {
return waitCount.load(std::memory_order_relaxed) == SIZE;
} else {
return true;
}
}
/*! /*!
\brief Returns true if the function queue is empty. \brief Returns true if the function queue is empty.
*/ */
@ -196,43 +159,54 @@ public:
/*! /*!
\brief Returns the ThreadCount that this class was created with. \brief Returns the ThreadCount that this class was created with.
*/ */
constexpr unsigned int getThreadCount() { constexpr unsigned int getMaxThreadCount() { return MAXSIZE; }
return SIZE;
}
/*! void easyStartAndWait() {
\brief Wakes all threads and blocks until all queued tasks are finished. if (MAXSIZE >= 2) {
startThreads();
If SIZE is less than 2, then this function call will block until all the
queued functions have been executed on the calling thread.
If SIZE is 2 or greater, then this function will block until all the
queued functions have been executed by the threads in the thread pool.
*/
void easyWakeAndWait() {
if(SIZE >= 2) {
do { do {
wakeThreads(); std::this_thread::sleep_for(std::chrono::microseconds(30));
std::this_thread::sleep_for(std::chrono::microseconds(150));
} while(!isQueueEmpty() bool isQueueEmpty = false;
|| (threadsIDs.find(std::this_thread::get_id()) != threadsIDs.end() {
&& extraThreadCount.load(std::memory_order_relaxed) != 0)); std::lock_guard<std::mutex> lock(queueMutex);
// } while(!isQueueEmpty() || !isAllThreadsWaiting()); isQueueEmpty = fnQueue.empty();
}
if (isQueueEmpty) {
break;
}
} while (true);
} else { } else {
sequentiallyRunTasks(); sequentiallyRunTasks();
} }
} }
private: bool isNotRunning() {
std::vector<std::thread> threads; std::lock_guard<std::mutex> lock(dequesMutex);
std::unordered_set<std::thread::id> threadsIDs; auto tIter = threadStacks.begin();
std::atomic_bool isAlive; auto mIter = threadStackMutexes.begin();
std::condition_variable cv; while (tIter != threadStacks.end() &&
std::mutex cvMutex; mIter != threadStackMutexes.end()) {
{
std::lock_guard<std::mutex> lock(*mIter);
if (!tIter->empty()) {
return false;
}
}
++tIter;
++mIter;
}
return true;
}
private:
Internal::ThreadStacksType threadStacks;
Internal::ThreadStacksMutexesT threadStackMutexes;
Internal::TPQueueType fnQueue; Internal::TPQueueType fnQueue;
std::mutex queueMutex; std::mutex queueMutex;
std::atomic_int waitCount; Internal::ThreadCountersT threadCounters;
std::atomic_int extraThreadCount; std::mutex dequesMutex;
void sequentiallyRunTasks() { void sequentiallyRunTasks() {
// pull functions from queue and run them on current thread // pull functions from queue and run them on current thread
@ -241,7 +215,7 @@ private:
do { do {
{ {
std::lock_guard<std::mutex> lock(queueMutex); std::lock_guard<std::mutex> lock(queueMutex);
if(!fnQueue.empty()) { if (!fnQueue.empty()) {
hasFn = true; hasFn = true;
fnTuple = fnQueue.front(); fnTuple = fnQueue.front();
fnQueue.pop(); fnQueue.pop();
@ -249,14 +223,50 @@ private:
hasFn = false; hasFn = false;
} }
} }
if(hasFn) { if (hasFn) {
std::get<0>(fnTuple)(std::get<1>(fnTuple)); std::get<0>(fnTuple)(std::get<1>(fnTuple));
} }
} while(hasFn); } while (hasFn);
} }
void checkStacks() {
std::lock_guard<std::mutex> lock(dequesMutex);
if (threadStacks.empty()) {
return;
}
bool erased = false;
do {
erased = false;
{
std::lock_guard<std::mutex> lock(threadStackMutexes.front());
if (threadStacks.front().empty()) {
threadStacks.pop_front();
threadCounters.pop_front();
erased = true;
}
}
if (erased) {
threadStackMutexes.pop_front();
} else {
break;
}
} while (!threadStacks.empty() && !threadStackMutexes.empty() &&
!threadCounters.empty());
}
Internal::PointersT newStackEntry() {
std::lock_guard<std::mutex> lock(dequesMutex);
threadStacks.emplace_back();
threadStackMutexes.emplace_back();
threadCounters.emplace_back();
threadCounters.back().store(0);
return {&threadStacks.back(), &threadStackMutexes.back(),
&threadCounters.back()};
}
}; };
} // namespace EC } // namespace EC
#endif #endif

View file

@ -1458,8 +1458,8 @@ TEST(EC, NestedThreadPoolTasks) {
EXPECT_NE(outer_c->x, inner_c->x); EXPECT_NE(outer_c->x, inner_c->x);
EXPECT_NE(outer_c->y, inner_c->y); EXPECT_NE(outer_c->y, inner_c->y);
} }
}, c, false); }, c, true);
}, &manager, true); }, &manager, true);
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); //std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }

View file

@ -16,22 +16,22 @@ TEST(ECThreadPool, OneThread) {
p.queueFn(fn, &data); p.queueFn(fn, &data);
p.wakeThreads(); p.startThreads();
do { do {
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
} while(!p.isQueueEmpty() || !p.isAllThreadsWaiting()); } while(!p.isQueueEmpty() || !p.isNotRunning());
ASSERT_EQ(data.load(), 1); ASSERT_EQ(data.load(), 1);
for(unsigned int i = 0; i < 10; ++i) { for(unsigned int i = 0; i < 10; ++i) {
p.queueFn(fn, &data); p.queueFn(fn, &data);
} }
p.wakeThreads(); p.startThreads();
do { do {
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
} while(!p.isQueueEmpty() || !p.isAllThreadsWaiting()); } while(!p.isQueueEmpty() || !p.isNotRunning());
ASSERT_EQ(data.load(), 11); ASSERT_EQ(data.load(), 11);
} }
@ -47,22 +47,22 @@ TEST(ECThreadPool, Simple) {
p.queueFn(fn, &data); p.queueFn(fn, &data);
p.wakeThreads(); p.startThreads();
do { do {
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
} while(!p.isQueueEmpty() || !p.isAllThreadsWaiting()); } while(!p.isQueueEmpty() || !p.isNotRunning());
ASSERT_EQ(data.load(), 1); ASSERT_EQ(data.load(), 1);
for(unsigned int i = 0; i < 10; ++i) { for(unsigned int i = 0; i < 10; ++i) {
p.queueFn(fn, &data); p.queueFn(fn, &data);
} }
p.wakeThreads(); p.startThreads();
do { do {
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
} while(!p.isQueueEmpty() || !p.isAllThreadsWaiting()); } while(!p.isQueueEmpty() || !p.isNotRunning());
ASSERT_EQ(data.load(), 11); ASSERT_EQ(data.load(), 11);
} }
@ -70,15 +70,15 @@ TEST(ECThreadPool, Simple) {
TEST(ECThreadPool, QueryCount) { TEST(ECThreadPool, QueryCount) {
{ {
OneThreadPool oneP; OneThreadPool oneP;
ASSERT_EQ(1, oneP.getThreadCount()); ASSERT_EQ(1, oneP.getMaxThreadCount());
} }
{ {
ThreeThreadPool threeP; ThreeThreadPool threeP;
ASSERT_EQ(3, threeP.getThreadCount()); ASSERT_EQ(3, threeP.getMaxThreadCount());
} }
} }
TEST(ECThreadPool, easyWakeAndWait) { TEST(ECThreadPool, easyStartAndWait) {
std::atomic_int data; std::atomic_int data;
data.store(0); data.store(0);
{ {
@ -89,7 +89,7 @@ TEST(ECThreadPool, easyWakeAndWait) {
atomicInt->fetch_add(1); atomicInt->fetch_add(1);
}, &data); }, &data);
} }
oneP.easyWakeAndWait(); oneP.easyStartAndWait();
EXPECT_EQ(20, data.load()); EXPECT_EQ(20, data.load());
} }
{ {
@ -100,7 +100,7 @@ TEST(ECThreadPool, easyWakeAndWait) {
atomicInt->fetch_add(1); atomicInt->fetch_add(1);
}, &data); }, &data);
} }
threeP.easyWakeAndWait(); threeP.easyStartAndWait();
EXPECT_EQ(40, data.load()); EXPECT_EQ(40, data.load());
} }
} }