From de2848004f007fffd7070428c55260b4de00a812 Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Thu, 11 Jan 2024 19:40:15 +0900 Subject: [PATCH] Enforce thread-safety on destroy context --- src/UDPC_Defines.hpp | 3 ++- src/UDPConnection.cpp | 55 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 50 insertions(+), 8 deletions(-) diff --git a/src/UDPC_Defines.hpp b/src/UDPC_Defines.hpp index 4a6df9f..7369271 100644 --- a/src/UDPC_Defines.hpp +++ b/src/UDPC_Defines.hpp @@ -225,7 +225,7 @@ public: char recvBuf[UDPC_PACKET_MAX_SIZE]; /* - * 0 - UNUSED + * 0 - is destucting * 1 - is client * 2 - libsodium enabled */ @@ -277,6 +277,7 @@ public: std::uint32_t atostrBufIndex; std::mutex setThreadedUpdateMutex; + std::atomic_uint32_t enableDisableFuncRunningCount; }; // struct Context diff --git a/src/UDPConnection.cpp b/src/UDPConnection.cpp index 1c049b7..37dab7f 100644 --- a/src/UDPConnection.cpp +++ b/src/UDPConnection.cpp @@ -258,7 +258,8 @@ threadedSleepTime(std::chrono::milliseconds(UDPC_UPDATE_MS_DEFAULT)), keysSet(), atostrBufIndexMutex(), atostrBufIndex(0), -setThreadedUpdateMutex() +setThreadedUpdateMutex(), +enableDisableFuncRunningCount(0) { std::memset(atostrBuf, 0, UDPC_ATOSTR_SIZE); @@ -268,6 +269,8 @@ setThreadedUpdateMutex() isAutoUpdating.store(false); } + flags.reset(0); + rng_engine.seed(std::chrono::system_clock::now().time_since_epoch().count()); threadRunning.store(true); @@ -2212,9 +2215,12 @@ int UDPC_enable_threaded_update(UDPC_HContext ctx) { return 0; } + c->enableDisableFuncRunningCount.fetch_add(1); + std::lock_guard setThreadedLock(c->setThreadedUpdateMutex); - if(c->isAutoUpdating.load() || c->thread.joinable()) { + if(c->flags.test(0) || c->isAutoUpdating.load() || c->thread.joinable()) { + c->enableDisableFuncRunningCount.fetch_sub(1); return 0; } @@ -2224,6 +2230,9 @@ int UDPC_enable_threaded_update(UDPC_HContext ctx) { c->thread = std::thread(UDPC::threadedUpdate, c); UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "Started threaded update"); + + c->enableDisableFuncRunningCount.fetch_sub(1); + return 1; } @@ -2233,9 +2242,12 @@ int UDPC_enable_threaded_update_ms(UDPC_HContext ctx, int updateMS) { return 0; } + c->enableDisableFuncRunningCount.fetch_add(1); + std::lock_guard setThreadedLock(c->setThreadedUpdateMutex); - if(c->isAutoUpdating.load() || c->thread.joinable()) { + if(c->flags.test(0) || c->isAutoUpdating.load() || c->thread.joinable()) { + c->enableDisableFuncRunningCount.fetch_sub(1); return 0; } @@ -2251,6 +2263,9 @@ int UDPC_enable_threaded_update_ms(UDPC_HContext ctx, int updateMS) { c->thread = std::thread(UDPC::threadedUpdate, c); UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "Started threaded update"); + + c->enableDisableFuncRunningCount.fetch_sub(1); + return 1; } @@ -2260,9 +2275,12 @@ int UDPC_disable_threaded_update(UDPC_HContext ctx) { return 0; } + c->enableDisableFuncRunningCount.fetch_add(1); + std::lock_guard setThreadedLock(c->setThreadedUpdateMutex); - if(!c->isAutoUpdating.load() || !c->thread.joinable()) { + if(c->flags.test(0) || !c->isAutoUpdating.load() || !c->thread.joinable()) { + c->enableDisableFuncRunningCount.fetch_sub(1); return 0; } @@ -2271,6 +2289,9 @@ int UDPC_disable_threaded_update(UDPC_HContext ctx) { c->isAutoUpdating.store(false); UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "Stopped threaded update"); + + c->enableDisableFuncRunningCount.fetch_sub(1); + return 1; } @@ -2281,12 +2302,32 @@ int UDPC_is_valid_context(UDPC_HContext ctx) { void UDPC_destroy(UDPC_HContext ctx) { UDPC::Context *UDPC_ctx = UDPC::verifyContext(ctx); if(UDPC_ctx) { - // stop thread if threaded - if(UDPC_ctx->isAutoUpdating.load()) { + { + // Acquire lock so that this code does not run at the same time as + // enabling/disabling threaded-update. + std::lock_guard + setThreadedLock(UDPC_ctx->setThreadedUpdateMutex); + + // Stop thread if threaded. + // Set atomic bool to false always so that the thread will always + // stop at this point. UDPC_ctx->threadRunning.store(false); - UDPC_ctx->thread.join(); + if(UDPC_ctx->isAutoUpdating.load() && UDPC_ctx->thread.joinable()) { + UDPC_ctx->thread.join(); + } + UDPC_ctx->isAutoUpdating.store(false); + + // Set destructing flag. + UDPC_ctx->flags.set(0); + + // Drop lock at this point before destructing the context. } + // After lock has been dropped, wait in case there are enable/disable + // threaded update functions waiting on the lock. Do this via a + // atomic-int-based spin-lock. + while(UDPC_ctx->enableDisableFuncRunningCount.load() != 0) {} + #if UDPC_PLATFORM == UDPC_PLATFORM_WINDOWS WSACleanup(); #endif