]> git.seodisparate.com - EntityComponentMetaSystem/commitdiff
Add atomic_bool to ThreadPool to ensure validity
authorStephen Seo <seo.disparate@gmail.com>
Thu, 16 Jun 2022 03:37:09 +0000 (12:37 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Thu, 16 Jun 2022 03:37:09 +0000 (12:37 +0900)
The new deque<atomic_bool> ensures that values are not dropped from the
deque until the entry's threads have finished execution.

src/EC/ThreadPool.hpp

index 003a61712a3f2cf1c7e183710c0bc14aa5876ad9..b85c69859e6cb8a93ba6a1702160d98235ae3249 100644 (file)
@@ -28,8 +28,9 @@ using ThreadStackType = std::vector<std::tuple<ThreadPtr, std::thread::id>>;
 using ThreadStacksType = std::deque<ThreadStackType>;
 using ThreadStacksMutexesT = std::deque<std::mutex>;
 using ThreadCountersT = std::deque<std::atomic_uint>;
+using PtrsHoldT = std::deque<std::atomic_bool>;
 using PointersT =
-    std::tuple<ThreadStackType *, std::mutex *, std::atomic_uint *>;
+    std::tuple<ThreadStackType *, std::mutex *, std::atomic_uint *, std::atomic_bool *>;
 }  // namespace Internal
 
 /*!
@@ -147,7 +148,7 @@ class ThreadPool {
         } else {
             sequentiallyRunTasks();
         }
-        return {nullptr, nullptr, nullptr};
+        return {nullptr, nullptr, nullptr, nullptr};
     }
 
     /*!
@@ -184,6 +185,7 @@ class ThreadPool {
                     {
                         std::lock_guard<std::mutex> lock(*std::get<1>(pointers));
                         if (std::get<0>(pointers)->empty()) {
+                            std::get<3>(pointers)->store(false);
                             break;
                         }
                     }
@@ -219,6 +221,7 @@ class ThreadPool {
     Internal::TPQueueType fnQueue;
     std::mutex queueMutex;
     Internal::ThreadCountersT threadCounters;
+    Internal::PtrsHoldT ptrsHoldBools;
     std::mutex dequesMutex;
 
     void sequentiallyRunTasks() {
@@ -253,9 +256,12 @@ class ThreadPool {
             erased = false;
             {
                 std::lock_guard<std::mutex> lock(threadStackMutexes.front());
-                if (threadStacks.front().empty()) {
+                if (ptrsHoldBools.front().load()) {
+                    break;
+                } else if (threadStacks.front().empty()) {
                     threadStacks.pop_front();
                     threadCounters.pop_front();
+                    ptrsHoldBools.pop_front();
                     erased = true;
                 }
             }
@@ -265,7 +271,7 @@ class ThreadPool {
                 break;
             }
         } while (!threadStacks.empty() && !threadStackMutexes.empty() &&
-                 !threadCounters.empty());
+                 !threadCounters.empty() && !ptrsHoldBools.empty());
     }
 
     Internal::PointersT newStackEntry() {
@@ -274,9 +280,11 @@ class ThreadPool {
         threadStackMutexes.emplace_back();
         threadCounters.emplace_back();
         threadCounters.back().store(0);
+        ptrsHoldBools.emplace_back();
+        ptrsHoldBools.back().store(true);
 
         return {&threadStacks.back(), &threadStackMutexes.back(),
-                &threadCounters.back()};
+                &threadCounters.back(), &ptrsHoldBools.back()};
     }
 };