Compare commits

..

10 commits

Author SHA1 Message Date
04c9f52997 Tweak compare_exchange_weak(...) in spin-lock 2024-01-11 20:18:10 +09:00
c72a337f77 Fix invalid use of mutex in TSLQueue
Mutex was removed in favor of the custom SharedSpinLock.
2024-01-11 20:18:10 +09:00
beb08b74d5 Don't fail on "try" fns if failed to get spinLock
In SharedSpinLock: Only fail on "try" fns after spinLock was acquired
and condition is not met.
2024-01-11 20:18:10 +09:00
e87cedf38b Refactor shared-spin-lock to use atomic "spinLock" 2024-01-11 20:18:10 +09:00
f0ac3449d8 Minor refactorings 2024-01-11 20:18:10 +09:00
bb14643d2a Reland C++11 "shared_lock" with iter remove fix
On iterator remove, the iterator will trade the read lock for a write
lock, and trade back for a read lock once the remove has been completed.
2024-01-11 20:18:10 +09:00
c03eae1c15 UnitTest for start/stop threaded update and fix
All checks were successful
Publish doxygen documentation to seodisparate.com / doxygen-gen-and-publish (push) Successful in 0s
2024-01-11 20:07:25 +09:00
de2848004f Enforce thread-safety on destroy context 2024-01-11 19:40:15 +09:00
186f2edf0f Use mutex when enabling/disabling threaded-update 2024-01-11 19:00:27 +09:00
74341e83d4 Use atomic_bool for auto-updating "flag" 2024-01-11 18:45:14 +09:00
3 changed files with 109 additions and 16 deletions

View file

@ -225,13 +225,14 @@ public:
char recvBuf[UDPC_PACKET_MAX_SIZE]; char recvBuf[UDPC_PACKET_MAX_SIZE];
/* /*
* 0 - is threaded * 0 - is destucting
* 1 - is client * 1 - is client
* 2 - libsodium enabled * 2 - libsodium enabled
*/ */
std::bitset<8> flags; std::bitset<8> flags;
std::atomic_bool isAcceptNewConnections; std::atomic_bool isAcceptNewConnections;
std::atomic_bool isReceivingEvents; std::atomic_bool isReceivingEvents;
std::atomic_bool isAutoUpdating;
std::atomic_uint32_t protocolID; std::atomic_uint32_t protocolID;
std::atomic_uint_fast8_t loggingType; std::atomic_uint_fast8_t loggingType;
// See UDPC_AuthPolicy enum in UDPC.h for possible values // See UDPC_AuthPolicy enum in UDPC.h for possible values
@ -275,6 +276,9 @@ public:
std::mutex atostrBufIndexMutex; std::mutex atostrBufIndexMutex;
std::uint32_t atostrBufIndex; std::uint32_t atostrBufIndex;
std::mutex setThreadedUpdateMutex;
std::atomic_uint32_t enableDisableFuncRunningCount;
}; // struct Context }; // struct Context
Context *verifyContext(UDPC_HContext ctx); Context *verifyContext(UDPC_HContext ctx);

View file

@ -222,6 +222,7 @@ _contextIdentifier(UDPC_CONTEXT_IDENTIFIER),
flags(), flags(),
isAcceptNewConnections(true), isAcceptNewConnections(true),
isReceivingEvents(false), isReceivingEvents(false),
isAutoUpdating(false),
protocolID(UDPC_DEFAULT_PROTOCOL_ID), protocolID(UDPC_DEFAULT_PROTOCOL_ID),
#ifndef NDEBUG #ifndef NDEBUG
loggingType(UDPC_DEBUG), loggingType(UDPC_DEBUG),
@ -256,16 +257,20 @@ peerPKWhitelistMutex(),
threadedSleepTime(std::chrono::milliseconds(UDPC_UPDATE_MS_DEFAULT)), threadedSleepTime(std::chrono::milliseconds(UDPC_UPDATE_MS_DEFAULT)),
keysSet(), keysSet(),
atostrBufIndexMutex(), atostrBufIndexMutex(),
atostrBufIndex(0) atostrBufIndex(0),
setThreadedUpdateMutex(),
enableDisableFuncRunningCount(0)
{ {
std::memset(atostrBuf, 0, UDPC_ATOSTR_SIZE); std::memset(atostrBuf, 0, UDPC_ATOSTR_SIZE);
if(isThreaded) { if(isThreaded) {
flags.set(0); isAutoUpdating.store(true);
} else { } else {
flags.reset(0); isAutoUpdating.store(false);
} }
flags.reset(0);
rng_engine.seed(std::chrono::system_clock::now().time_since_epoch().count()); rng_engine.seed(std::chrono::system_clock::now().time_since_epoch().count());
threadRunning.store(true); threadRunning.store(true);
@ -2171,7 +2176,7 @@ UDPC_HContext UDPC_init_threaded_update(UDPC_ConnectionId listenId,
return nullptr; return nullptr;
} }
ctx->flags.set(0); ctx->isAutoUpdating.store(true);
ctx->threadedSleepTime = std::chrono::milliseconds(UDPC_UPDATE_MS_DEFAULT); ctx->threadedSleepTime = std::chrono::milliseconds(UDPC_UPDATE_MS_DEFAULT);
ctx->thread = std::thread(UDPC::threadedUpdate, ctx); ctx->thread = std::thread(UDPC::threadedUpdate, ctx);
@ -2189,7 +2194,7 @@ UDPC_HContext UDPC_init_threaded_update_ms(
return nullptr; return nullptr;
} }
ctx->flags.set(0); ctx->isAutoUpdating.store(true);
if(updateMS < UDPC_UPDATE_MS_MIN) { if(updateMS < UDPC_UPDATE_MS_MIN) {
ctx->threadedSleepTime = std::chrono::milliseconds(UDPC_UPDATE_MS_MIN); ctx->threadedSleepTime = std::chrono::milliseconds(UDPC_UPDATE_MS_MIN);
} else if(updateMS > UDPC_UPDATE_MS_MAX) { } else if(updateMS > UDPC_UPDATE_MS_MAX) {
@ -2206,26 +2211,47 @@ UDPC_HContext UDPC_init_threaded_update_ms(
int UDPC_enable_threaded_update(UDPC_HContext ctx) { int UDPC_enable_threaded_update(UDPC_HContext ctx) {
UDPC::Context *c = UDPC::verifyContext(ctx); UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c || c->flags.test(0) || c->thread.joinable()) { if (!c) {
return 0; return 0;
} }
c->flags.set(0); c->enableDisableFuncRunningCount.fetch_add(1);
std::lock_guard<std::mutex> setThreadedLock(c->setThreadedUpdateMutex);
if(c->flags.test(0) || c->isAutoUpdating.load() || c->thread.joinable()) {
c->enableDisableFuncRunningCount.fetch_sub(1);
return 0;
}
c->isAutoUpdating.store(true);
c->threadedSleepTime = std::chrono::milliseconds(UDPC_UPDATE_MS_DEFAULT); c->threadedSleepTime = std::chrono::milliseconds(UDPC_UPDATE_MS_DEFAULT);
c->threadRunning.store(true); c->threadRunning.store(true);
c->thread = std::thread(UDPC::threadedUpdate, c); c->thread = std::thread(UDPC::threadedUpdate, c);
UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "Started threaded update"); UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "Started threaded update");
c->enableDisableFuncRunningCount.fetch_sub(1);
return 1; return 1;
} }
int UDPC_enable_threaded_update_ms(UDPC_HContext ctx, int updateMS) { int UDPC_enable_threaded_update_ms(UDPC_HContext ctx, int updateMS) {
UDPC::Context *c = UDPC::verifyContext(ctx); UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c || c->flags.test(0) || c->thread.joinable()) { if (!c) {
return 0; return 0;
} }
c->flags.set(0); c->enableDisableFuncRunningCount.fetch_add(1);
std::lock_guard<std::mutex> setThreadedLock(c->setThreadedUpdateMutex);
if(c->flags.test(0) || c->isAutoUpdating.load() || c->thread.joinable()) {
c->enableDisableFuncRunningCount.fetch_sub(1);
return 0;
}
c->isAutoUpdating.store(true);
if(updateMS < UDPC_UPDATE_MS_MIN) { if(updateMS < UDPC_UPDATE_MS_MIN) {
c->threadedSleepTime = std::chrono::milliseconds(UDPC_UPDATE_MS_MIN); c->threadedSleepTime = std::chrono::milliseconds(UDPC_UPDATE_MS_MIN);
} else if(updateMS > UDPC_UPDATE_MS_MAX) { } else if(updateMS > UDPC_UPDATE_MS_MAX) {
@ -2237,20 +2263,35 @@ int UDPC_enable_threaded_update_ms(UDPC_HContext ctx, int updateMS) {
c->thread = std::thread(UDPC::threadedUpdate, c); c->thread = std::thread(UDPC::threadedUpdate, c);
UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "Started threaded update"); UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "Started threaded update");
c->enableDisableFuncRunningCount.fetch_sub(1);
return 1; return 1;
} }
int UDPC_disable_threaded_update(UDPC_HContext ctx) { int UDPC_disable_threaded_update(UDPC_HContext ctx) {
UDPC::Context *c = UDPC::verifyContext(ctx); UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c || !c->flags.test(0) || !c->thread.joinable()) { if (!c) {
return 0;
}
c->enableDisableFuncRunningCount.fetch_add(1);
std::lock_guard<std::mutex> setThreadedLock(c->setThreadedUpdateMutex);
if(c->flags.test(0) || !c->isAutoUpdating.load() || !c->thread.joinable()) {
c->enableDisableFuncRunningCount.fetch_sub(1);
return 0; return 0;
} }
c->threadRunning.store(false); c->threadRunning.store(false);
c->thread.join(); c->thread.join();
c->flags.reset(0); c->isAutoUpdating.store(false);
UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "Stopped threaded update"); UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "Stopped threaded update");
c->enableDisableFuncRunningCount.fetch_sub(1);
return 1; return 1;
} }
@ -2261,10 +2302,32 @@ int UDPC_is_valid_context(UDPC_HContext ctx) {
void UDPC_destroy(UDPC_HContext ctx) { void UDPC_destroy(UDPC_HContext ctx) {
UDPC::Context *UDPC_ctx = UDPC::verifyContext(ctx); UDPC::Context *UDPC_ctx = UDPC::verifyContext(ctx);
if(UDPC_ctx) { if(UDPC_ctx) {
// stop thread if threaded {
if(UDPC_ctx->flags.test(0)) { // Acquire lock so that this code does not run at the same time as
// enabling/disabling threaded-update.
std::lock_guard<std::mutex>
setThreadedLock(UDPC_ctx->setThreadedUpdateMutex);
// Stop thread if threaded.
// Set atomic bool to false always so that the thread will always
// stop at this point.
UDPC_ctx->threadRunning.store(false); UDPC_ctx->threadRunning.store(false);
UDPC_ctx->thread.join(); if(UDPC_ctx->isAutoUpdating.load() && UDPC_ctx->thread.joinable()) {
UDPC_ctx->thread.join();
}
UDPC_ctx->isAutoUpdating.store(false);
// Set destructing flag.
UDPC_ctx->flags.set(0);
// Drop lock at this point before destructing the context.
}
// After lock has been dropped, wait in case there are enable/disable
// threaded update functions waiting on the lock. Do this via a
// atomic-int-based spin-lock.
while(UDPC_ctx->enableDisableFuncRunningCount.load() != 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} }
#if UDPC_PLATFORM == UDPC_PLATFORM_WINDOWS #if UDPC_PLATFORM == UDPC_PLATFORM_WINDOWS
@ -2277,7 +2340,7 @@ void UDPC_destroy(UDPC_HContext ctx) {
void UDPC_update(UDPC_HContext ctx) { void UDPC_update(UDPC_HContext ctx) {
UDPC::Context *c = UDPC::verifyContext(ctx); UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c || c->flags.test(0)) { if(!c || c->isAutoUpdating.load()) {
// invalid or is threaded, update should not be called // invalid or is threaded, update should not be called
return; return;
} }

View file

@ -434,3 +434,29 @@ TEST(UDPC, free_packet_ptr) {
UDPC_free_PacketInfo_ptr(&pinfo); UDPC_free_PacketInfo_ptr(&pinfo);
UDPC_free_PacketInfo_ptr(nullptr); UDPC_free_PacketInfo_ptr(nullptr);
} }
TEST(UDPC, enableDisableThreadedUpdate_StressTest) {
UDPC_ConnectionId id = UDPC_create_id_anyaddr(0);
UDPC_HContext ctx = UDPC_init(id, 0, 0);
std::array<std::thread, 100> thread_array;
for (int i = 0; i < 100; ++i) {
if (i % 2 == 0) {
thread_array[i] = std::thread([] (UDPC_HContext ctx) {
UDPC_enable_threaded_update(ctx);
}, ctx);
} else {
thread_array[i] = std::thread([] (UDPC_HContext ctx) {
UDPC_disable_threaded_update(ctx);
}, ctx);
}
}
thread_array[0].join();
UDPC_destroy(ctx);
for (int i = 1; i < 100; ++i) {
thread_array[i].join();
}
}