diff --git a/src/EC/Manager.hpp b/src/EC/Manager.hpp index 6a5333b..89e11c3 100644 --- a/src/EC/Manager.hpp +++ b/src/EC/Manager.hpp @@ -771,11 +771,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->wakeThreads(); - do { - std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool->isQueueEmpty() - || !threadPool->isAllThreadsWaiting()); + threadPool->easyWakeAndWait(); } } @@ -892,11 +888,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->wakeThreads(); - do { - std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool->isQueueEmpty() - || !threadPool->isAllThreadsWaiting()); + threadPool->easyWakeAndWait(); } } @@ -1037,12 +1029,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->wakeThreads(); - do { - std::this_thread::sleep_for( - std::chrono::microseconds(200)); - } while(!threadPool->isQueueEmpty() - || !threadPool->isAllThreadsWaiting()); + threadPool->easyWakeAndWait(); } }))); @@ -1117,11 +1104,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->wakeThreads(); - do { - std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool->isQueueEmpty() - || !threadPool->isAllThreadsWaiting()); + threadPool->easyWakeAndWait(); } return matchingV; @@ -1494,11 +1477,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->wakeThreads(); - do { - std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool->isQueueEmpty() - || !threadPool->isAllThreadsWaiting()); + threadPool->easyWakeAndWait(); } // call functions on matching entities @@ -1560,12 +1539,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->wakeThreads(); - do { - std::this_thread::sleep_for( - std::chrono::microseconds(200)); - } while(!threadPool->isQueueEmpty() - || !threadPool->isAllThreadsWaiting()); + threadPool->easyWakeAndWait(); } } ); @@ -1703,11 +1677,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->wakeThreads(); - do { - std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool->isQueueEmpty() - || !threadPool->isAllThreadsWaiting()); + threadPool->easyWakeAndWait(); } // call functions on matching entities @@ -1774,12 +1744,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->wakeThreads(); - do { - std::this_thread::sleep_for( - std::chrono::microseconds(200)); - } while(!threadPool->isQueueEmpty() - || !threadPool->isAllThreadsWaiting()); + threadPool->easyWakeAndWait(); } } ); @@ -1857,11 +1822,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->wakeThreads(); - do { - std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool->isQueueEmpty() - || !threadPool->isAllThreadsWaiting()); + threadPool->easyWakeAndWait(); } } @@ -1950,11 +1911,7 @@ namespace EC } }, &fnDataAr[i]); } - threadPool->wakeThreads(); - do { - std::this_thread::sleep_for(std::chrono::microseconds(200)); - } while(!threadPool->isQueueEmpty() - || !threadPool->isAllThreadsWaiting()); + threadPool->easyWakeAndWait(); } } }; diff --git a/src/EC/ThreadPool.hpp b/src/EC/ThreadPool.hpp index 3b68c6b..e0dd21f 100644 --- a/src/EC/ThreadPool.hpp +++ b/src/EC/ThreadPool.hpp @@ -117,24 +117,7 @@ public: cv.notify_one(); } } else { - // pull functions from queue and run them on main thread - Internal::TPTupleType fnTuple; - bool hasFn; - do { - { - std::lock_guard lock(queueMutex); - if(!fnQueue.empty()) { - hasFn = true; - fnTuple = fnQueue.front(); - fnQueue.pop(); - } else { - hasFn = false; - } - } - if(hasFn) { - std::get<0>(fnTuple)(std::get<1>(fnTuple)); - } - } while(hasFn); + sequentiallyRunTasks(); } } @@ -179,6 +162,26 @@ public: return SIZE; } + /*! + \brief Wakes all threads and blocks until all queued tasks are finished. + + If SIZE is less than 2, then this function call will block until all the + queued functions have been executed on the calling thread. + + If SIZE is 2 or greater, then this function will block until all the + queued functions have been executed by the threads in the thread pool. + */ + void easyWakeAndWait() { + if(SIZE >= 2) { + wakeThreads(); + do { + std::this_thread::sleep_for(std::chrono::microseconds(150)); + } while(!isQueueEmpty() || !isAllThreadsWaiting()); + } else { + sequentiallyRunTasks(); + } + } + private: std::vector threads; std::atomic_bool isAlive; @@ -189,6 +192,27 @@ private: int waitCount; std::mutex waitCountMutex; + void sequentiallyRunTasks() { + // pull functions from queue and run them on current thread + Internal::TPTupleType fnTuple; + bool hasFn; + do { + { + std::lock_guard lock(queueMutex); + if(!fnQueue.empty()) { + hasFn = true; + fnTuple = fnQueue.front(); + fnQueue.pop(); + } else { + hasFn = false; + } + } + if(hasFn) { + std::get<0>(fnTuple)(std::get<1>(fnTuple)); + } + } while(hasFn); + } + }; } // namespace EC diff --git a/src/test/ThreadPoolTest.cpp b/src/test/ThreadPoolTest.cpp index 5a1a51d..9ebcbe1 100644 --- a/src/test/ThreadPoolTest.cpp +++ b/src/test/ThreadPoolTest.cpp @@ -77,3 +77,30 @@ TEST(ECThreadPool, QueryCount) { ASSERT_EQ(3, threeP.getThreadCount()); } } + +TEST(ECThreadPool, easyWakeAndWait) { + std::atomic_int data; + data.store(0); + { + OneThreadPool oneP; + for(unsigned int i = 0; i < 20; ++i) { + oneP.queueFn([] (void *ud) { + auto *atomicInt = static_cast(ud); + atomicInt->fetch_add(1); + }, &data); + } + oneP.easyWakeAndWait(); + EXPECT_EQ(20, data.load()); + } + { + ThreeThreadPool threeP; + for(unsigned int i = 0; i < 20; ++i) { + threeP.queueFn([] (void *ud) { + auto *atomicInt = static_cast(ud); + atomicInt->fetch_add(1); + }, &data); + } + threeP.easyWakeAndWait(); + EXPECT_EQ(40, data.load()); + } +}