}
}, &fnDataAr[i]);
}
- threadPool->wakeThreads();
- do {
- std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool->isQueueEmpty()
- || !threadPool->isAllThreadsWaiting());
+ threadPool->easyWakeAndWait();
}
}
}
}, &fnDataAr[i]);
}
- threadPool->wakeThreads();
- do {
- std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool->isQueueEmpty()
- || !threadPool->isAllThreadsWaiting());
+ threadPool->easyWakeAndWait();
}
}
}
}, &fnDataAr[i]);
}
- threadPool->wakeThreads();
- do {
- std::this_thread::sleep_for(
- std::chrono::microseconds(200));
- } while(!threadPool->isQueueEmpty()
- || !threadPool->isAllThreadsWaiting());
+ threadPool->easyWakeAndWait();
}
})));
}
}, &fnDataAr[i]);
}
- threadPool->wakeThreads();
- do {
- std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool->isQueueEmpty()
- || !threadPool->isAllThreadsWaiting());
+ threadPool->easyWakeAndWait();
}
return matchingV;
}
}, &fnDataAr[i]);
}
- threadPool->wakeThreads();
- do {
- std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool->isQueueEmpty()
- || !threadPool->isAllThreadsWaiting());
+ threadPool->easyWakeAndWait();
}
// call functions on matching entities
}
}, &fnDataAr[i]);
}
- threadPool->wakeThreads();
- do {
- std::this_thread::sleep_for(
- std::chrono::microseconds(200));
- } while(!threadPool->isQueueEmpty()
- || !threadPool->isAllThreadsWaiting());
+ threadPool->easyWakeAndWait();
}
}
);
}
}, &fnDataAr[i]);
}
- threadPool->wakeThreads();
- do {
- std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool->isQueueEmpty()
- || !threadPool->isAllThreadsWaiting());
+ threadPool->easyWakeAndWait();
}
// call functions on matching entities
}
}, &fnDataAr[i]);
}
- threadPool->wakeThreads();
- do {
- std::this_thread::sleep_for(
- std::chrono::microseconds(200));
- } while(!threadPool->isQueueEmpty()
- || !threadPool->isAllThreadsWaiting());
+ threadPool->easyWakeAndWait();
}
}
);
}
}, &fnDataAr[i]);
}
- threadPool->wakeThreads();
- do {
- std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool->isQueueEmpty()
- || !threadPool->isAllThreadsWaiting());
+ threadPool->easyWakeAndWait();
}
}
}
}, &fnDataAr[i]);
}
- threadPool->wakeThreads();
- do {
- std::this_thread::sleep_for(std::chrono::microseconds(200));
- } while(!threadPool->isQueueEmpty()
- || !threadPool->isAllThreadsWaiting());
+ threadPool->easyWakeAndWait();
}
}
};
cv.notify_one();
}
} else {
- // pull functions from queue and run them on main thread
- Internal::TPTupleType fnTuple;
- bool hasFn;
- do {
- {
- std::lock_guard<std::mutex> lock(queueMutex);
- if(!fnQueue.empty()) {
- hasFn = true;
- fnTuple = fnQueue.front();
- fnQueue.pop();
- } else {
- hasFn = false;
- }
- }
- if(hasFn) {
- std::get<0>(fnTuple)(std::get<1>(fnTuple));
- }
- } while(hasFn);
+ sequentiallyRunTasks();
}
}
return SIZE;
}
+ /*!
+ \brief Wakes all threads and blocks until all queued tasks are finished.
+
+ If SIZE is less than 2, then this function call will block until all the
+ queued functions have been executed on the calling thread.
+
+ If SIZE is 2 or greater, then this function will block until all the
+ queued functions have been executed by the threads in the thread pool.
+ */
+ void easyWakeAndWait() {
+ if(SIZE >= 2) {
+ wakeThreads();
+ do {
+ std::this_thread::sleep_for(std::chrono::microseconds(150));
+ } while(!isQueueEmpty() || !isAllThreadsWaiting());
+ } else {
+ sequentiallyRunTasks();
+ }
+ }
+
private:
std::vector<std::thread> threads;
std::atomic_bool isAlive;
int waitCount;
std::mutex waitCountMutex;
+ void sequentiallyRunTasks() {
+ // pull functions from queue and run them on current thread
+ Internal::TPTupleType fnTuple;
+ bool hasFn;
+ do {
+ {
+ std::lock_guard<std::mutex> lock(queueMutex);
+ if(!fnQueue.empty()) {
+ hasFn = true;
+ fnTuple = fnQueue.front();
+ fnQueue.pop();
+ } else {
+ hasFn = false;
+ }
+ }
+ if(hasFn) {
+ std::get<0>(fnTuple)(std::get<1>(fnTuple));
+ }
+ } while(hasFn);
+ }
+
};
} // namespace EC
ASSERT_EQ(3, threeP.getThreadCount());
}
}
+
+TEST(ECThreadPool, easyWakeAndWait) {
+ std::atomic_int data;
+ data.store(0);
+ {
+ OneThreadPool oneP;
+ for(unsigned int i = 0; i < 20; ++i) {
+ oneP.queueFn([] (void *ud) {
+ auto *atomicInt = static_cast<std::atomic_int*>(ud);
+ atomicInt->fetch_add(1);
+ }, &data);
+ }
+ oneP.easyWakeAndWait();
+ EXPECT_EQ(20, data.load());
+ }
+ {
+ ThreeThreadPool threeP;
+ for(unsigned int i = 0; i < 20; ++i) {
+ threeP.queueFn([] (void *ud) {
+ auto *atomicInt = static_cast<std::atomic_int*>(ud);
+ atomicInt->fetch_add(1);
+ }, &data);
+ }
+ threeP.easyWakeAndWait();
+ EXPECT_EQ(40, data.load());
+ }
+}