]> git.seodisparate.com - EntityComponentMetaSystem/commitdiff
Add easyWakeAndWait() to ThreadPool
authorStephen Seo <seo.disparate@gmail.com>
Thu, 9 Sep 2021 06:53:55 +0000 (15:53 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Thu, 9 Sep 2021 06:53:55 +0000 (15:53 +0900)
Changed usage of ThreadPool in EC::Manager to use easyWakeAndWait().

src/EC/Manager.hpp
src/EC/ThreadPool.hpp
src/test/ThreadPoolTest.cpp

index 6a5333b8ee2110ab532416cec6a09a56dbf30abd..89e11c3c17454c8af09807e99bf0094f6793efc3 100644 (file)
@@ -771,11 +771,7 @@ namespace EC
                         }
                     }, &fnDataAr[i]);
                 }
-                threadPool->wakeThreads();
-                do {
-                    std::this_thread::sleep_for(std::chrono::microseconds(200));
-                } while(!threadPool->isQueueEmpty()
-                    || !threadPool->isAllThreadsWaiting());
+                threadPool->easyWakeAndWait();
             }
         }
 
@@ -892,11 +888,7 @@ namespace EC
                         }
                     }, &fnDataAr[i]);
                 }
-                threadPool->wakeThreads();
-                do {
-                    std::this_thread::sleep_for(std::chrono::microseconds(200));
-                } while(!threadPool->isQueueEmpty()
-                    || !threadPool->isAllThreadsWaiting());
+                threadPool->easyWakeAndWait();
             }
         }
 
@@ -1037,12 +1029,7 @@ namespace EC
                                 }
                             }, &fnDataAr[i]);
                         }
-                        threadPool->wakeThreads();
-                        do {
-                            std::this_thread::sleep_for(
-                                    std::chrono::microseconds(200));
-                        } while(!threadPool->isQueueEmpty()
-                            || !threadPool->isAllThreadsWaiting());
+                        threadPool->easyWakeAndWait();
                     }
                 })));
 
@@ -1117,11 +1104,7 @@ namespace EC
                         }
                     }, &fnDataAr[i]);
                 }
-                threadPool->wakeThreads();
-                do {
-                    std::this_thread::sleep_for(std::chrono::microseconds(200));
-                } while(!threadPool->isQueueEmpty()
-                    || !threadPool->isAllThreadsWaiting());
+                threadPool->easyWakeAndWait();
             }
 
             return matchingV;
@@ -1494,11 +1477,7 @@ namespace EC
                         }
                     }, &fnDataAr[i]);
                 }
-                threadPool->wakeThreads();
-                do {
-                    std::this_thread::sleep_for(std::chrono::microseconds(200));
-                } while(!threadPool->isQueueEmpty()
-                    || !threadPool->isAllThreadsWaiting());
+                threadPool->easyWakeAndWait();
             }
 
             // call functions on matching entities
@@ -1560,12 +1539,7 @@ namespace EC
                                 }
                             }, &fnDataAr[i]);
                         }
-                        threadPool->wakeThreads();
-                        do {
-                            std::this_thread::sleep_for(
-                                std::chrono::microseconds(200));
-                        } while(!threadPool->isQueueEmpty()
-                            || !threadPool->isAllThreadsWaiting());
+                        threadPool->easyWakeAndWait();
                     }
                 }
             );
@@ -1703,11 +1677,7 @@ namespace EC
                         }
                     }, &fnDataAr[i]);
                 }
-                threadPool->wakeThreads();
-                do {
-                    std::this_thread::sleep_for(std::chrono::microseconds(200));
-                } while(!threadPool->isQueueEmpty()
-                    || !threadPool->isAllThreadsWaiting());
+                threadPool->easyWakeAndWait();
             }
 
             // call functions on matching entities
@@ -1774,12 +1744,7 @@ namespace EC
                                 }
                             }, &fnDataAr[i]);
                         }
-                        threadPool->wakeThreads();
-                        do {
-                            std::this_thread::sleep_for(
-                                std::chrono::microseconds(200));
-                        } while(!threadPool->isQueueEmpty()
-                            || !threadPool->isAllThreadsWaiting());
+                        threadPool->easyWakeAndWait();
                     }
                 }
             );
@@ -1857,11 +1822,7 @@ namespace EC
                         }
                     }, &fnDataAr[i]);
                 }
-                threadPool->wakeThreads();
-                do {
-                    std::this_thread::sleep_for(std::chrono::microseconds(200));
-                } while(!threadPool->isQueueEmpty()
-                    || !threadPool->isAllThreadsWaiting());
+                threadPool->easyWakeAndWait();
             }
         }
 
@@ -1950,11 +1911,7 @@ namespace EC
                         }
                     }, &fnDataAr[i]);
                 }
-                threadPool->wakeThreads();
-                do {
-                    std::this_thread::sleep_for(std::chrono::microseconds(200));
-                } while(!threadPool->isQueueEmpty()
-                    || !threadPool->isAllThreadsWaiting());
+                threadPool->easyWakeAndWait();
             }
         }
     };
index 3b68c6b28e7d09832862f4ef38a328450f9d0c48..e0dd21f7b5c758cf6539372186d777ca1a97fa88 100644 (file)
@@ -117,24 +117,7 @@ public:
                 cv.notify_one();
             }
         } else {
-            // pull functions from queue and run them on main thread
-            Internal::TPTupleType fnTuple;
-            bool hasFn;
-            do {
-                {
-                    std::lock_guard<std::mutex> lock(queueMutex);
-                    if(!fnQueue.empty()) {
-                        hasFn = true;
-                        fnTuple = fnQueue.front();
-                        fnQueue.pop();
-                    } else {
-                        hasFn = false;
-                    }
-                }
-                if(hasFn) {
-                    std::get<0>(fnTuple)(std::get<1>(fnTuple));
-                }
-            } while(hasFn);
+            sequentiallyRunTasks();
         }
     }
 
@@ -179,6 +162,26 @@ public:
         return SIZE;
     }
 
+    /*!
+        \brief Wakes all threads and blocks until all queued tasks are finished.
+
+        If SIZE is less than 2, then this function call will block until all the
+        queued functions have been executed on the calling thread.
+
+        If SIZE is 2 or greater, then this function will block until all the
+        queued functions have been executed by the threads in the thread pool.
+     */
+    void easyWakeAndWait() {
+        if(SIZE >= 2) {
+            wakeThreads();
+            do {
+                std::this_thread::sleep_for(std::chrono::microseconds(150));
+            } while(!isQueueEmpty() || !isAllThreadsWaiting());
+        } else {
+            sequentiallyRunTasks();
+        }
+    }
+
 private:
     std::vector<std::thread> threads;
     std::atomic_bool isAlive;
@@ -189,6 +192,27 @@ private:
     int waitCount;
     std::mutex waitCountMutex;
 
+    void sequentiallyRunTasks() {
+        // pull functions from queue and run them on current thread
+        Internal::TPTupleType fnTuple;
+        bool hasFn;
+        do {
+            {
+                std::lock_guard<std::mutex> lock(queueMutex);
+                if(!fnQueue.empty()) {
+                    hasFn = true;
+                    fnTuple = fnQueue.front();
+                    fnQueue.pop();
+                } else {
+                    hasFn = false;
+                }
+            }
+            if(hasFn) {
+                std::get<0>(fnTuple)(std::get<1>(fnTuple));
+            }
+        } while(hasFn);
+    }
+
 };
 
 } // namespace EC
index 5a1a51d64709a7149fe6f094ab80ce71d18dd421..9ebcbe18d21ebd5233f787a8095a0f44dbe4211a 100644 (file)
@@ -77,3 +77,30 @@ TEST(ECThreadPool, QueryCount) {
         ASSERT_EQ(3, threeP.getThreadCount());
     }
 }
+
+TEST(ECThreadPool, easyWakeAndWait) {
+    std::atomic_int data;
+    data.store(0);
+    {
+        OneThreadPool oneP;
+        for(unsigned int i = 0; i < 20; ++i) {
+            oneP.queueFn([] (void *ud) {
+                auto *atomicInt = static_cast<std::atomic_int*>(ud);
+                atomicInt->fetch_add(1);
+            }, &data);
+        }
+        oneP.easyWakeAndWait();
+        EXPECT_EQ(20, data.load());
+    }
+    {
+        ThreeThreadPool threeP;
+        for(unsigned int i = 0; i < 20; ++i) {
+            threeP.queueFn([] (void *ud) {
+                auto *atomicInt = static_cast<std::atomic_int*>(ud);
+                atomicInt->fetch_add(1);
+            }, &data);
+        }
+        threeP.easyWakeAndWait();
+        EXPECT_EQ(40, data.load());
+    }
+}