Add atomic_bool to ThreadPool to ensure validity

The new deque<atomic_bool> ensures that values are not dropped from the
deque until the entry's threads have finished execution.
This commit is contained in:
Stephen Seo 2022-06-16 12:37:09 +09:00
parent 62600cbfa6
commit a879e0ef8c

View file

@ -28,8 +28,9 @@ using ThreadStackType = std::vector<std::tuple<ThreadPtr, std::thread::id>>;
using ThreadStacksType = std::deque<ThreadStackType>; using ThreadStacksType = std::deque<ThreadStackType>;
using ThreadStacksMutexesT = std::deque<std::mutex>; using ThreadStacksMutexesT = std::deque<std::mutex>;
using ThreadCountersT = std::deque<std::atomic_uint>; using ThreadCountersT = std::deque<std::atomic_uint>;
using PtrsHoldT = std::deque<std::atomic_bool>;
using PointersT = using PointersT =
std::tuple<ThreadStackType *, std::mutex *, std::atomic_uint *>; std::tuple<ThreadStackType *, std::mutex *, std::atomic_uint *, std::atomic_bool *>;
} // namespace Internal } // namespace Internal
/*! /*!
@ -147,7 +148,7 @@ class ThreadPool {
} else { } else {
sequentiallyRunTasks(); sequentiallyRunTasks();
} }
return {nullptr, nullptr, nullptr}; return {nullptr, nullptr, nullptr, nullptr};
} }
/*! /*!
@ -184,6 +185,7 @@ class ThreadPool {
{ {
std::lock_guard<std::mutex> lock(*std::get<1>(pointers)); std::lock_guard<std::mutex> lock(*std::get<1>(pointers));
if (std::get<0>(pointers)->empty()) { if (std::get<0>(pointers)->empty()) {
std::get<3>(pointers)->store(false);
break; break;
} }
} }
@ -219,6 +221,7 @@ class ThreadPool {
Internal::TPQueueType fnQueue; Internal::TPQueueType fnQueue;
std::mutex queueMutex; std::mutex queueMutex;
Internal::ThreadCountersT threadCounters; Internal::ThreadCountersT threadCounters;
Internal::PtrsHoldT ptrsHoldBools;
std::mutex dequesMutex; std::mutex dequesMutex;
void sequentiallyRunTasks() { void sequentiallyRunTasks() {
@ -253,9 +256,12 @@ class ThreadPool {
erased = false; erased = false;
{ {
std::lock_guard<std::mutex> lock(threadStackMutexes.front()); std::lock_guard<std::mutex> lock(threadStackMutexes.front());
if (threadStacks.front().empty()) { if (ptrsHoldBools.front().load()) {
break;
} else if (threadStacks.front().empty()) {
threadStacks.pop_front(); threadStacks.pop_front();
threadCounters.pop_front(); threadCounters.pop_front();
ptrsHoldBools.pop_front();
erased = true; erased = true;
} }
} }
@ -265,7 +271,7 @@ class ThreadPool {
break; break;
} }
} while (!threadStacks.empty() && !threadStackMutexes.empty() && } while (!threadStacks.empty() && !threadStackMutexes.empty() &&
!threadCounters.empty()); !threadCounters.empty() && !ptrsHoldBools.empty());
} }
Internal::PointersT newStackEntry() { Internal::PointersT newStackEntry() {
@ -274,9 +280,11 @@ class ThreadPool {
threadStackMutexes.emplace_back(); threadStackMutexes.emplace_back();
threadCounters.emplace_back(); threadCounters.emplace_back();
threadCounters.back().store(0); threadCounters.back().store(0);
ptrsHoldBools.emplace_back();
ptrsHoldBools.back().store(true);
return {&threadStacks.back(), &threadStackMutexes.back(), return {&threadStacks.back(), &threadStackMutexes.back(),
&threadCounters.back()}; &threadCounters.back(), &ptrsHoldBools.back()};
} }
}; };