Merge branch 'nested_threads'

This commit is contained in:
Stephen Seo 2022-06-16 13:42:52 +09:00
commit cc95c9758e
5 changed files with 2241 additions and 1950 deletions

212
.clang-format Normal file
View file

@ -0,0 +1,212 @@
---
Language: Cpp
# BasedOnStyle: Google
AccessModifierOffset: -1
AlignAfterOpenBracket: Align
AlignArrayOfStructures: None
AlignConsecutiveMacros: None
AlignConsecutiveAssignments: None
AlignConsecutiveBitFields: None
AlignConsecutiveDeclarations: None
AlignEscapedNewlines: Left
AlignOperands: Align
AlignTrailingComments: true
AllowAllArgumentsOnNextLine: true
AllowAllConstructorInitializersOnNextLine: true
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortEnumsOnASingleLine: true
AllowShortBlocksOnASingleLine: Never
AllowShortCaseLabelsOnASingleLine: false
AllowShortFunctionsOnASingleLine: All
AllowShortLambdasOnASingleLine: All
AllowShortIfStatementsOnASingleLine: WithoutElse
AllowShortLoopsOnASingleLine: true
AlwaysBreakAfterDefinitionReturnType: None
AlwaysBreakAfterReturnType: None
AlwaysBreakBeforeMultilineStrings: true
AlwaysBreakTemplateDeclarations: Yes
AttributeMacros:
- __capability
BinPackArguments: true
BinPackParameters: true
BraceWrapping:
AfterCaseLabel: false
AfterClass: false
AfterControlStatement: Never
AfterEnum: false
AfterFunction: false
AfterNamespace: false
AfterObjCDeclaration: false
AfterStruct: false
AfterUnion: false
AfterExternBlock: false
BeforeCatch: false
BeforeElse: false
BeforeLambdaBody: false
BeforeWhile: false
IndentBraces: false
SplitEmptyFunction: true
SplitEmptyRecord: true
SplitEmptyNamespace: true
BreakBeforeBinaryOperators: None
BreakBeforeConceptDeclarations: true
BreakBeforeBraces: Attach
BreakBeforeInheritanceComma: false
BreakInheritanceList: BeforeColon
BreakBeforeTernaryOperators: true
BreakConstructorInitializersBeforeComma: false
BreakConstructorInitializers: BeforeColon
BreakAfterJavaFieldAnnotations: false
BreakStringLiterals: true
ColumnLimit: 80
CommentPragmas: '^ IWYU pragma:'
CompactNamespaces: false
ConstructorInitializerAllOnOneLineOrOnePerLine: true
ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
Cpp11BracedListStyle: true
DeriveLineEnding: true
DerivePointerAlignment: true
DisableFormat: false
EmptyLineAfterAccessModifier: Never
EmptyLineBeforeAccessModifier: LogicalBlock
ExperimentalAutoDetectBinPacking: false
FixNamespaceComments: true
ForEachMacros:
- foreach
- Q_FOREACH
- BOOST_FOREACH
IfMacros:
- KJ_IF_MAYBE
IncludeBlocks: Regroup
IncludeCategories:
- Regex: '^<ext/.*\.h>'
Priority: 2
SortPriority: 0
CaseSensitive: false
- Regex: '^<.*\.h>'
Priority: 1
SortPriority: 0
CaseSensitive: false
- Regex: '^<.*'
Priority: 2
SortPriority: 0
CaseSensitive: false
- Regex: '.*'
Priority: 3
SortPriority: 0
CaseSensitive: false
IncludeIsMainRegex: '([-_](test|unittest))?$'
IncludeIsMainSourceRegex: ''
IndentAccessModifiers: false
IndentCaseLabels: true
IndentCaseBlocks: false
IndentGotoLabels: true
IndentPPDirectives: None
IndentExternBlock: AfterExternBlock
IndentRequires: false
IndentWidth: 4
IndentWrappedFunctionNames: false
InsertTrailingCommas: None
JavaScriptQuotes: Leave
JavaScriptWrapImports: true
KeepEmptyLinesAtTheStartOfBlocks: false
LambdaBodyIndentation: Signature
MacroBlockBegin: ''
MacroBlockEnd: ''
MaxEmptyLinesToKeep: 1
NamespaceIndentation: None
ObjCBinPackProtocolList: Never
ObjCBlockIndentWidth: 4
ObjCBreakBeforeNestedBlockParam: true
ObjCSpaceAfterProperty: false
ObjCSpaceBeforeProtocolList: true
PenaltyBreakAssignment: 2
PenaltyBreakBeforeFirstCallParameter: 1
PenaltyBreakComment: 300
PenaltyBreakFirstLessLess: 120
PenaltyBreakString: 1000
PenaltyBreakTemplateDeclaration: 10
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 200
PenaltyIndentedWhitespace: 0
PointerAlignment: Left
PPIndentWidth: -1
RawStringFormats:
- Language: Cpp
Delimiters:
- cc
- CC
- cpp
- Cpp
- CPP
- 'c++'
- 'C++'
CanonicalDelimiter: ''
BasedOnStyle: google
- Language: TextProto
Delimiters:
- pb
- PB
- proto
- PROTO
EnclosingFunctions:
- EqualsProto
- EquivToProto
- PARSE_PARTIAL_TEXT_PROTO
- PARSE_TEST_PROTO
- PARSE_TEXT_PROTO
- ParseTextOrDie
- ParseTextProtoOrDie
- ParseTestProto
- ParsePartialTestProto
CanonicalDelimiter: pb
BasedOnStyle: google
ReferenceAlignment: Pointer
ReflowComments: true
ShortNamespaceLines: 1
SortIncludes: CaseSensitive
SortJavaStaticImport: Before
SortUsingDeclarations: true
SpaceAfterCStyleCast: false
SpaceAfterLogicalNot: false
SpaceAfterTemplateKeyword: true
SpaceBeforeAssignmentOperators: true
SpaceBeforeCaseColon: false
SpaceBeforeCpp11BracedList: false
SpaceBeforeCtorInitializerColon: true
SpaceBeforeInheritanceColon: true
SpaceBeforeParens: ControlStatements
SpaceAroundPointerQualifiers: Default
SpaceBeforeRangeBasedForLoopColon: true
SpaceInEmptyBlock: false
SpaceInEmptyParentheses: false
SpacesBeforeTrailingComments: 2
SpacesInAngles: Never
SpacesInConditionalStatement: false
SpacesInContainerLiterals: true
SpacesInCStyleCastParentheses: false
SpacesInLineCommentPrefix:
Minimum: 1
Maximum: -1
SpacesInParentheses: false
SpacesInSquareBrackets: false
SpaceBeforeSquareBrackets: false
BitFieldColonSpacing: Both
Standard: Auto
StatementAttributeLikeMacros:
- Q_EMIT
StatementMacros:
- Q_UNUSED
- QT_REQUIRE_VERSION
TabWidth: 4
UseCRLF: false
UseTab: Never
WhitespaceSensitiveMacros:
- STRINGIZE
- PP_STRINGIZE
- BOOST_PP_STRINGIZE
- NS_SWIFT_NAME
- CF_SWIFT_NAME
...

File diff suppressed because it is too large Load diff

View file

@ -1,16 +1,21 @@
#ifndef EC_META_SYSTEM_THREADPOOL_HPP
#define EC_META_SYSTEM_THREADPOOL_HPP
#include <type_traits>
#include <vector>
#include <thread>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <tuple>
#include <chrono>
#include <deque>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <tuple>
#include <vector>
#ifndef NDEBUG
#include <iostream>
#endif
namespace EC {
@ -18,81 +23,43 @@ namespace Internal {
using TPFnType = std::function<void(void *)>;
using TPTupleType = std::tuple<TPFnType, void *>;
using TPQueueType = std::queue<TPTupleType>;
using ThreadPtr = std::unique_ptr<std::thread>;
using ThreadStackType = std::vector<std::tuple<ThreadPtr, std::thread::id>>;
using ThreadStacksType = std::deque<ThreadStackType>;
using ThreadStacksMutexesT = std::deque<std::mutex>;
using ThreadCountersT = std::deque<std::atomic_uint>;
using PtrsHoldT = std::deque<std::atomic_bool>;
using PointersT = std::tuple<ThreadStackType *, std::mutex *,
std::atomic_uint *, std::atomic_bool *>;
} // namespace Internal
/*!
\brief Implementation of a Thread Pool.
Note that if SIZE is less than 2, then ThreadPool will not create threads and
run queued functions on the calling thread.
Note that MAXSIZE template parameter determines how many threads are created
each time that startThreads() (or easyStartAndWait()) is called.
*/
template <unsigned int SIZE>
template <unsigned int MAXSIZE>
class ThreadPool {
public:
ThreadPool() : waitCount(0) {
isAlive.store(true);
if(SIZE >= 2) {
for(unsigned int i = 0; i < SIZE; ++i) {
threads.emplace_back([] (std::atomic_bool *isAlive,
std::condition_variable *cv,
std::mutex *cvMutex,
Internal::TPQueueType *fnQueue,
std::mutex *queueMutex,
int *waitCount,
std::mutex *waitCountMutex) {
bool hasFn = false;
Internal::TPTupleType fnTuple;
while(isAlive->load()) {
hasFn = false;
{
std::lock_guard<std::mutex> lock(*queueMutex);
if(!fnQueue->empty()) {
fnTuple = fnQueue->front();
fnQueue->pop();
hasFn = true;
}
}
if(hasFn) {
std::get<0>(fnTuple)(std::get<1>(fnTuple));
continue;
}
{
std::lock_guard<std::mutex> lock(*waitCountMutex);
*waitCount += 1;
}
{
std::unique_lock<std::mutex> lock(*cvMutex);
cv->wait(lock);
}
{
std::lock_guard<std::mutex> lock(*waitCountMutex);
*waitCount -= 1;
}
}
}, &isAlive, &cv, &cvMutex, &fnQueue, &queueMutex, &waitCount,
&waitCountMutex);
}
}
}
ThreadPool()
: threadStacks{}, threadStackMutexes{}, fnQueue{}, queueMutex{} {}
~ThreadPool() {
if(SIZE >= 2) {
isAlive.store(false);
std::this_thread::sleep_for(std::chrono::milliseconds(20));
cv.notify_all();
for(auto &thread : threads) {
thread.join();
}
while (!isNotRunning()) {
std::this_thread::sleep_for(std::chrono::microseconds(30));
}
}
/*!
\brief Queues a function to be called (doesn't start calling yet).
To run the queued functions, wakeThreads() must be called to wake the
To run the queued functions, startThreads() must be called to wake the
waiting threads which will start pulling functions from the queue to be
called.
Note that the easyStartAndWait() calls startThreads() and waits until
the threads have finished execution.
*/
void queueFn(std::function<void(void *)> &&fn, void *ud = nullptr) {
std::lock_guard<std::mutex> lock(queueMutex);
@ -100,51 +67,95 @@ public:
}
/*!
\brief Wakes waiting threads to start running queued functions.
\brief Creates MAXSIZE threads that will process queueFn() functions.
If SIZE is less than 2, then this function call will block until all the
queued functions have been executed on the calling thread.
If SIZE is 2 or greater, then this function will return immediately after
waking one or all threads, depending on the given boolean parameter.
Note that if MAXSIZE < 2, then this function will synchronously execute
the queued functions and block until the functions have been executed.
Otherwise, this function may return before the queued functions have
been executed.
*/
void wakeThreads(bool wakeAll = true) {
if(SIZE >= 2) {
// wake threads to pull functions from queue and run them
if(wakeAll) {
cv.notify_all();
} else {
cv.notify_one();
Internal::PointersT startThreads() {
if (MAXSIZE >= 2) {
checkStacks();
auto pointers = newStackEntry();
Internal::ThreadStackType *threadStack = std::get<0>(pointers);
std::mutex *threadStackMutex = std::get<1>(pointers);
std::atomic_uint *aCounter = std::get<2>(pointers);
for (unsigned int i = 0; i < MAXSIZE; ++i) {
std::thread *newThread = new std::thread(
[](Internal::ThreadStackType *threadStack,
std::mutex *threadStackMutex,
Internal::TPQueueType *fnQueue, std::mutex *queueMutex,
std::atomic_uint *initCount) {
// add id to idStack "call stack"
{
std::lock_guard<std::mutex> lock(*threadStackMutex);
threadStack->push_back(
{Internal::ThreadPtr(nullptr),
std::this_thread::get_id()});
}
++(*initCount);
// fetch queued fns and execute them
// fnTuples must live until end of function
std::list<Internal::TPTupleType> fnTuples;
do {
bool fnFound = false;
{
std::lock_guard<std::mutex> lock(*queueMutex);
if (!fnQueue->empty()) {
fnTuples.emplace_back(
std::move(fnQueue->front()));
fnQueue->pop();
fnFound = true;
}
}
if (fnFound) {
std::get<0>(fnTuples.back())(
std::get<1>(fnTuples.back()));
} else {
break;
}
} while (true);
// pop id from idStack "call stack"
do {
std::this_thread::sleep_for(
std::chrono::microseconds(15));
if (initCount->load() != MAXSIZE) {
continue;
}
{
std::lock_guard<std::mutex> lock(
*threadStackMutex);
if (std::get<1>(threadStack->back()) ==
std::this_thread::get_id()) {
if (!std::get<0>(threadStack->back())) {
continue;
}
std::get<0>(threadStack->back())->detach();
threadStack->pop_back();
break;
}
}
} while (true);
},
threadStack, threadStackMutex, &fnQueue, &queueMutex,
aCounter);
// Wait until thread has pushed to threadStack before setting
// the handle to it
while (aCounter->load() != i + 1) {
std::this_thread::sleep_for(std::chrono::microseconds(15));
}
std::lock_guard<std::mutex> stackLock(*threadStackMutex);
std::get<0>(threadStack->at(i)).reset(newThread);
}
return pointers;
} else {
sequentiallyRunTasks();
}
}
/*!
\brief Gets the number of waiting threads.
If all threads are waiting, this should equal ThreadCount.
If SIZE is less than 2, then this will always return 0.
*/
int getWaitCount() {
std::lock_guard<std::mutex> lock(waitCountMutex);
return waitCount;
}
/*!
\brief Returns true if all threads are waiting.
If SIZE is less than 2, then this will always return true.
*/
bool isAllThreadsWaiting() {
if(SIZE >= 2) {
std::lock_guard<std::mutex> lock(waitCountMutex);
return waitCount == SIZE;
} else {
return true;
}
return {nullptr, nullptr, nullptr, nullptr};
}
/*!
@ -156,41 +167,80 @@ public:
}
/*!
\brief Returns the ThreadCount that this class was created with.
\brief Returns the MAXSIZE count that this class was created with.
*/
constexpr unsigned int getThreadCount() {
return SIZE;
}
constexpr unsigned int getMaxThreadCount() { return MAXSIZE; }
/*!
\brief Wakes all threads and blocks until all queued tasks are finished.
\brief Calls startThreads() and waits until all threads have finished.
If SIZE is less than 2, then this function call will block until all the
queued functions have been executed on the calling thread.
If SIZE is 2 or greater, then this function will block until all the
queued functions have been executed by the threads in the thread pool.
Regardless of the value set to MAXSIZE, this function will block until
all previously queued functions have been executed.
*/
void easyWakeAndWait() {
if(SIZE >= 2) {
wakeThreads();
void easyStartAndWait() {
if (MAXSIZE >= 2) {
Internal::PointersT pointers = startThreads();
do {
std::this_thread::sleep_for(std::chrono::microseconds(150));
} while(!isQueueEmpty() || !isAllThreadsWaiting());
std::this_thread::sleep_for(std::chrono::microseconds(30));
bool isQueueEmpty = false;
{
std::lock_guard<std::mutex> lock(queueMutex);
isQueueEmpty = fnQueue.empty();
}
if (isQueueEmpty) {
break;
}
} while (true);
if (std::get<0>(pointers)) {
do {
{
std::lock_guard<std::mutex> lock(
*std::get<1>(pointers));
if (std::get<0>(pointers)->empty()) {
std::get<3>(pointers)->store(false);
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(15));
} while (true);
}
} else {
sequentiallyRunTasks();
}
}
/*!
\brief Checks if any threads are currently running, returning true if
there are no threads running.
*/
bool isNotRunning() {
std::lock_guard<std::mutex> lock(dequesMutex);
auto tIter = threadStacks.begin();
auto mIter = threadStackMutexes.begin();
while (tIter != threadStacks.end() &&
mIter != threadStackMutexes.end()) {
{
std::lock_guard<std::mutex> lock(*mIter);
if (!tIter->empty()) {
return false;
}
}
++tIter;
++mIter;
}
return true;
}
private:
std::vector<std::thread> threads;
std::atomic_bool isAlive;
std::condition_variable cv;
std::mutex cvMutex;
Internal::ThreadStacksType threadStacks;
Internal::ThreadStacksMutexesT threadStackMutexes;
Internal::TPQueueType fnQueue;
std::mutex queueMutex;
int waitCount;
std::mutex waitCountMutex;
Internal::ThreadCountersT threadCounters;
Internal::PtrsHoldT ptrsHoldBools;
std::mutex dequesMutex;
void sequentiallyRunTasks() {
// pull functions from queue and run them on current thread
@ -213,6 +263,47 @@ private:
} while (hasFn);
}
void checkStacks() {
std::lock_guard<std::mutex> lock(dequesMutex);
if (threadStacks.empty()) {
return;
}
bool erased = false;
do {
erased = false;
{
std::lock_guard<std::mutex> lock(threadStackMutexes.front());
if (ptrsHoldBools.front().load()) {
break;
} else if (threadStacks.front().empty()) {
threadStacks.pop_front();
threadCounters.pop_front();
ptrsHoldBools.pop_front();
erased = true;
}
}
if (erased) {
threadStackMutexes.pop_front();
} else {
break;
}
} while (!threadStacks.empty() && !threadStackMutexes.empty() &&
!threadCounters.empty() && !ptrsHoldBools.empty());
}
Internal::PointersT newStackEntry() {
std::lock_guard<std::mutex> lock(dequesMutex);
threadStacks.emplace_back();
threadStackMutexes.emplace_back();
threadCounters.emplace_back();
threadCounters.back().store(0);
ptrsHoldBools.emplace_back();
ptrsHoldBools.back().store(true);
return {&threadStacks.back(), &threadStackMutexes.back(),
&threadCounters.back(), &ptrsHoldBools.back()};
}
};
} // namespace EC

View file

@ -1,7 +1,9 @@
#include <gtest/gtest.h>
#include <chrono>
#include <iostream>
#include <thread>
#include <tuple>
#include <memory>
#include <unordered_map>
@ -1431,3 +1433,33 @@ TEST(EC, ManagerDeferredDeletions) {
}
}
}
TEST(EC, NestedThreadPoolTasks) {
using ManagerType = EC::Manager<ListComponentsAll, ListTagsAll, 2>;
ManagerType manager;
std::array<std::size_t, 64> entities;
for (auto &entity : entities) {
entity = manager.addEntity();
manager.addComponent<C0>(entity, entity, entity);
}
manager.forMatchingSignature<EC::Meta::TypeList<C0>>([] (std::size_t id, void *data, C0 *c) {
ManagerType *manager = (ManagerType*)data;
manager->forMatchingSignature<EC::Meta::TypeList<C0>>([id] (std::size_t inner_id, void* data, C0 *inner_c) {
const C0 *const outer_c = (C0*)data;
EXPECT_EQ(id, outer_c->x);
EXPECT_EQ(inner_id, inner_c->x);
if (id == inner_id) {
EXPECT_EQ(outer_c->x, inner_c->x);
EXPECT_EQ(outer_c->y, inner_c->y);
} else {
EXPECT_NE(outer_c->x, inner_c->x);
EXPECT_NE(outer_c->y, inner_c->y);
}
}, c, true);
}, &manager, true);
//std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

View file

@ -16,22 +16,22 @@ TEST(ECThreadPool, OneThread) {
p.queueFn(fn, &data);
p.wakeThreads();
p.startThreads();
do {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} while(!p.isQueueEmpty() || !p.isAllThreadsWaiting());
} while(!p.isQueueEmpty() || !p.isNotRunning());
ASSERT_EQ(data.load(), 1);
for(unsigned int i = 0; i < 10; ++i) {
p.queueFn(fn, &data);
}
p.wakeThreads();
p.startThreads();
do {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} while(!p.isQueueEmpty() || !p.isAllThreadsWaiting());
} while(!p.isQueueEmpty() || !p.isNotRunning());
ASSERT_EQ(data.load(), 11);
}
@ -47,22 +47,22 @@ TEST(ECThreadPool, Simple) {
p.queueFn(fn, &data);
p.wakeThreads();
p.startThreads();
do {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} while(!p.isQueueEmpty() || !p.isAllThreadsWaiting());
} while(!p.isQueueEmpty() || !p.isNotRunning());
ASSERT_EQ(data.load(), 1);
for(unsigned int i = 0; i < 10; ++i) {
p.queueFn(fn, &data);
}
p.wakeThreads();
p.startThreads();
do {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} while(!p.isQueueEmpty() || !p.isAllThreadsWaiting());
} while(!p.isQueueEmpty() || !p.isNotRunning());
ASSERT_EQ(data.load(), 11);
}
@ -70,15 +70,15 @@ TEST(ECThreadPool, Simple) {
TEST(ECThreadPool, QueryCount) {
{
OneThreadPool oneP;
ASSERT_EQ(1, oneP.getThreadCount());
ASSERT_EQ(1, oneP.getMaxThreadCount());
}
{
ThreeThreadPool threeP;
ASSERT_EQ(3, threeP.getThreadCount());
ASSERT_EQ(3, threeP.getMaxThreadCount());
}
}
TEST(ECThreadPool, easyWakeAndWait) {
TEST(ECThreadPool, easyStartAndWait) {
std::atomic_int data;
data.store(0);
{
@ -89,7 +89,7 @@ TEST(ECThreadPool, easyWakeAndWait) {
atomicInt->fetch_add(1);
}, &data);
}
oneP.easyWakeAndWait();
oneP.easyStartAndWait();
EXPECT_EQ(20, data.load());
}
{
@ -100,7 +100,7 @@ TEST(ECThreadPool, easyWakeAndWait) {
atomicInt->fetch_add(1);
}, &data);
}
threeP.easyWakeAndWait();
threeP.easyStartAndWait();
EXPECT_EQ(40, data.load());
}
}