Add easyWakeAndWait() to ThreadPool

Changed usage of ThreadPool in EC::Manager to use easyWakeAndWait().
This commit is contained in:
Stephen Seo 2021-09-09 15:53:55 +09:00
parent e0a18900e4
commit f27c22675a
3 changed files with 79 additions and 71 deletions

View file

@ -771,11 +771,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->wakeThreads(); threadPool->easyWakeAndWait();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
} while(!threadPool->isQueueEmpty()
|| !threadPool->isAllThreadsWaiting());
} }
} }
@ -892,11 +888,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->wakeThreads(); threadPool->easyWakeAndWait();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
} while(!threadPool->isQueueEmpty()
|| !threadPool->isAllThreadsWaiting());
} }
} }
@ -1037,12 +1029,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->wakeThreads(); threadPool->easyWakeAndWait();
do {
std::this_thread::sleep_for(
std::chrono::microseconds(200));
} while(!threadPool->isQueueEmpty()
|| !threadPool->isAllThreadsWaiting());
} }
}))); })));
@ -1117,11 +1104,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->wakeThreads(); threadPool->easyWakeAndWait();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
} while(!threadPool->isQueueEmpty()
|| !threadPool->isAllThreadsWaiting());
} }
return matchingV; return matchingV;
@ -1494,11 +1477,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->wakeThreads(); threadPool->easyWakeAndWait();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
} while(!threadPool->isQueueEmpty()
|| !threadPool->isAllThreadsWaiting());
} }
// call functions on matching entities // call functions on matching entities
@ -1560,12 +1539,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->wakeThreads(); threadPool->easyWakeAndWait();
do {
std::this_thread::sleep_for(
std::chrono::microseconds(200));
} while(!threadPool->isQueueEmpty()
|| !threadPool->isAllThreadsWaiting());
} }
} }
); );
@ -1703,11 +1677,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->wakeThreads(); threadPool->easyWakeAndWait();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
} while(!threadPool->isQueueEmpty()
|| !threadPool->isAllThreadsWaiting());
} }
// call functions on matching entities // call functions on matching entities
@ -1774,12 +1744,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->wakeThreads(); threadPool->easyWakeAndWait();
do {
std::this_thread::sleep_for(
std::chrono::microseconds(200));
} while(!threadPool->isQueueEmpty()
|| !threadPool->isAllThreadsWaiting());
} }
} }
); );
@ -1857,11 +1822,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->wakeThreads(); threadPool->easyWakeAndWait();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
} while(!threadPool->isQueueEmpty()
|| !threadPool->isAllThreadsWaiting());
} }
} }
@ -1950,11 +1911,7 @@ namespace EC
} }
}, &fnDataAr[i]); }, &fnDataAr[i]);
} }
threadPool->wakeThreads(); threadPool->easyWakeAndWait();
do {
std::this_thread::sleep_for(std::chrono::microseconds(200));
} while(!threadPool->isQueueEmpty()
|| !threadPool->isAllThreadsWaiting());
} }
} }
}; };

View file

@ -117,24 +117,7 @@ public:
cv.notify_one(); cv.notify_one();
} }
} else { } else {
// pull functions from queue and run them on main thread sequentiallyRunTasks();
Internal::TPTupleType fnTuple;
bool hasFn;
do {
{
std::lock_guard<std::mutex> lock(queueMutex);
if(!fnQueue.empty()) {
hasFn = true;
fnTuple = fnQueue.front();
fnQueue.pop();
} else {
hasFn = false;
}
}
if(hasFn) {
std::get<0>(fnTuple)(std::get<1>(fnTuple));
}
} while(hasFn);
} }
} }
@ -179,6 +162,26 @@ public:
return SIZE; return SIZE;
} }
/*!
\brief Wakes all threads and blocks until all queued tasks are finished.
If SIZE is less than 2, then this function call will block until all the
queued functions have been executed on the calling thread.
If SIZE is 2 or greater, then this function will block until all the
queued functions have been executed by the threads in the thread pool.
*/
void easyWakeAndWait() {
if(SIZE >= 2) {
wakeThreads();
do {
std::this_thread::sleep_for(std::chrono::microseconds(150));
} while(!isQueueEmpty() || !isAllThreadsWaiting());
} else {
sequentiallyRunTasks();
}
}
private: private:
std::vector<std::thread> threads; std::vector<std::thread> threads;
std::atomic_bool isAlive; std::atomic_bool isAlive;
@ -189,6 +192,27 @@ private:
int waitCount; int waitCount;
std::mutex waitCountMutex; std::mutex waitCountMutex;
void sequentiallyRunTasks() {
// pull functions from queue and run them on current thread
Internal::TPTupleType fnTuple;
bool hasFn;
do {
{
std::lock_guard<std::mutex> lock(queueMutex);
if(!fnQueue.empty()) {
hasFn = true;
fnTuple = fnQueue.front();
fnQueue.pop();
} else {
hasFn = false;
}
}
if(hasFn) {
std::get<0>(fnTuple)(std::get<1>(fnTuple));
}
} while(hasFn);
}
}; };
} // namespace EC } // namespace EC

View file

@ -77,3 +77,30 @@ TEST(ECThreadPool, QueryCount) {
ASSERT_EQ(3, threeP.getThreadCount()); ASSERT_EQ(3, threeP.getThreadCount());
} }
} }
TEST(ECThreadPool, easyWakeAndWait) {
std::atomic_int data;
data.store(0);
{
OneThreadPool oneP;
for(unsigned int i = 0; i < 20; ++i) {
oneP.queueFn([] (void *ud) {
auto *atomicInt = static_cast<std::atomic_int*>(ud);
atomicInt->fetch_add(1);
}, &data);
}
oneP.easyWakeAndWait();
EXPECT_EQ(20, data.load());
}
{
ThreeThreadPool threeP;
for(unsigned int i = 0; i < 20; ++i) {
threeP.queueFn([] (void *ud) {
auto *atomicInt = static_cast<std::atomic_int*>(ud);
atomicInt->fetch_add(1);
}, &data);
}
threeP.easyWakeAndWait();
EXPECT_EQ(40, data.load());
}
}