Compare commits
10 commits
cdf2999256
...
04c9f52997
Author | SHA1 | Date | |
---|---|---|---|
04c9f52997 | |||
c72a337f77 | |||
beb08b74d5 | |||
e87cedf38b | |||
f0ac3449d8 | |||
bb14643d2a | |||
c03eae1c15 | |||
de2848004f | |||
186f2edf0f | |||
74341e83d4 |
8 changed files with 595 additions and 46 deletions
|
@ -5,6 +5,7 @@ set(UDPC_VERSION 1.0)
|
|||
|
||||
set(UDPC_SOURCES
|
||||
src/UDPConnection.cpp
|
||||
src/CXX11_shared_spin_lock.cpp
|
||||
)
|
||||
|
||||
add_compile_options(
|
||||
|
@ -62,6 +63,7 @@ if(CMAKE_BUILD_TYPE MATCHES "Debug")
|
|||
find_package(GTest QUIET)
|
||||
if(GTEST_FOUND)
|
||||
set(UDPC_UnitTest_SOURCES
|
||||
src/CXX11_shared_spin_lock.cpp
|
||||
src/test/UDPC_UnitTest.cpp
|
||||
src/test/TestTSLQueue.cpp
|
||||
src/test/TestUDPC.cpp
|
||||
|
|
218
src/CXX11_shared_spin_lock.cpp
Normal file
218
src/CXX11_shared_spin_lock.cpp
Normal file
|
@ -0,0 +1,218 @@
|
|||
#include "CXX11_shared_spin_lock.hpp"
|
||||
|
||||
UDPC::Badge UDPC::Badge::newInvalid() {
|
||||
Badge badge;
|
||||
badge.isValid = false;
|
||||
return badge;
|
||||
}
|
||||
|
||||
UDPC::Badge::Badge() :
|
||||
isValid(true)
|
||||
{}
|
||||
|
||||
UDPC::SharedSpinLock::Ptr UDPC::SharedSpinLock::newInstance() {
|
||||
Ptr sharedSpinLock = Ptr(new SharedSpinLock());
|
||||
sharedSpinLock->selfWeakPtr = sharedSpinLock;
|
||||
return sharedSpinLock;
|
||||
}
|
||||
|
||||
UDPC::SharedSpinLock::SharedSpinLock() :
|
||||
selfWeakPtr(),
|
||||
spinLock(false),
|
||||
read(0),
|
||||
write(false)
|
||||
{}
|
||||
|
||||
UDPC::LockObj<false> UDPC::SharedSpinLock::spin_read_lock() {
|
||||
bool expected;
|
||||
while (true) {
|
||||
expected = false;
|
||||
if(spinLock.compare_exchange_weak(expected, true, std::memory_order_acquire)) {
|
||||
if (!write) {
|
||||
++read;
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
return LockObj<false>(selfWeakPtr, Badge{});
|
||||
} else {
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
UDPC::LockObj<false> UDPC::SharedSpinLock::try_spin_read_lock() {
|
||||
bool expected;
|
||||
while (true) {
|
||||
expected = false;
|
||||
if (spinLock.compare_exchange_weak(expected, true, std::memory_order_acquire)) {
|
||||
if (!write) {
|
||||
++read;
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
return LockObj<false>(selfWeakPtr, Badge{});
|
||||
} else {
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return LockObj<false>{};
|
||||
}
|
||||
|
||||
void UDPC::SharedSpinLock::read_unlock(UDPC::Badge &&badge) {
|
||||
if (badge.isValid) {
|
||||
bool expected;
|
||||
while (true) {
|
||||
expected = false;
|
||||
if (spinLock.compare_exchange_weak(expected, true, std::memory_order_acquire)) {
|
||||
if (read > 0) {
|
||||
--read;
|
||||
badge.isValid = false;
|
||||
}
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
UDPC::LockObj<true> UDPC::SharedSpinLock::spin_write_lock() {
|
||||
bool expected;
|
||||
while (true) {
|
||||
expected = false;
|
||||
if (spinLock.compare_exchange_weak(expected, true, std::memory_order_acquire)) {
|
||||
if (!write && read == 0) {
|
||||
write = true;
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
return LockObj<true>(selfWeakPtr, Badge{});
|
||||
} else {
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
UDPC::LockObj<true> UDPC::SharedSpinLock::try_spin_write_lock() {
|
||||
bool expected;
|
||||
while (true) {
|
||||
expected = false;
|
||||
if (spinLock.compare_exchange_weak(expected, true, std::memory_order_acquire)) {
|
||||
if (!write && read == 0) {
|
||||
write = true;
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
return LockObj<true>(selfWeakPtr, Badge{});
|
||||
} else {
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return LockObj<true>{};
|
||||
}
|
||||
|
||||
void UDPC::SharedSpinLock::write_unlock(UDPC::Badge &&badge) {
|
||||
if (badge.isValid) {
|
||||
bool expected;
|
||||
while(true) {
|
||||
expected = false;
|
||||
if (spinLock.compare_exchange_weak(expected, true, std::memory_order_acquire)) {
|
||||
if (write) {
|
||||
write = false;
|
||||
badge.isValid = false;
|
||||
}
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
UDPC::LockObj<false> UDPC::SharedSpinLock::trade_write_for_read_lock(UDPC::LockObj<true> &lockObj) {
|
||||
if (lockObj.isValid() && lockObj.badge.isValid) {
|
||||
bool expected;
|
||||
while (true) {
|
||||
expected = false;
|
||||
if (spinLock.compare_exchange_weak(expected, true, std::memory_order_acquire)) {
|
||||
if (write && read == 0) {
|
||||
read = 1;
|
||||
write = false;
|
||||
lockObj.isLocked = false;
|
||||
lockObj.badge.isValid = false;
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
return LockObj<false>(selfWeakPtr, Badge{});
|
||||
} else {
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return LockObj<false>{};
|
||||
}
|
||||
}
|
||||
|
||||
UDPC::LockObj<false> UDPC::SharedSpinLock::try_trade_write_for_read_lock(UDPC::LockObj<true> &lockObj) {
|
||||
if (lockObj.isValid() && lockObj.badge.isValid) {
|
||||
bool expected;
|
||||
while (true) {
|
||||
expected = false;
|
||||
if (spinLock.compare_exchange_weak(expected, true, std::memory_order_acquire)) {
|
||||
if (write && read == 0) {
|
||||
read = 1;
|
||||
write = false;
|
||||
lockObj.isLocked = false;
|
||||
lockObj.badge.isValid = false;
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
return LockObj<false>(selfWeakPtr, Badge{});
|
||||
} else {
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return LockObj<false>{};
|
||||
}
|
||||
|
||||
UDPC::LockObj<true> UDPC::SharedSpinLock::trade_read_for_write_lock(UDPC::LockObj<false> &lockObj) {
|
||||
if (lockObj.isValid() && lockObj.badge.isValid) {
|
||||
bool expected;
|
||||
while (true) {
|
||||
expected = false;
|
||||
if (spinLock.compare_exchange_weak(expected, true, std::memory_order_acquire)) {
|
||||
if (!write && read == 1) {
|
||||
read = 0;
|
||||
write = true;
|
||||
lockObj.isLocked = false;
|
||||
lockObj.badge.isValid = false;
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
return LockObj<true>(selfWeakPtr, Badge{});
|
||||
} else {
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return LockObj<true>{};
|
||||
}
|
||||
}
|
||||
|
||||
UDPC::LockObj<true> UDPC::SharedSpinLock::try_trade_read_for_write_lock(UDPC::LockObj<false> &lockObj) {
|
||||
if (lockObj.isValid() && lockObj.badge.isValid) {
|
||||
bool expected;
|
||||
while (true) {
|
||||
expected = false;
|
||||
if (spinLock.compare_exchange_weak(expected, true, std::memory_order_acquire)) {
|
||||
if (!write && read == 1) {
|
||||
read = 0;
|
||||
write = true;
|
||||
lockObj.isLocked = false;
|
||||
lockObj.badge.isValid = false;
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
return LockObj<true>(selfWeakPtr, Badge{});
|
||||
} else {
|
||||
spinLock.store(false, std::memory_order_release);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return LockObj<true>{};
|
||||
}
|
156
src/CXX11_shared_spin_lock.hpp
Normal file
156
src/CXX11_shared_spin_lock.hpp
Normal file
|
@ -0,0 +1,156 @@
|
|||
#ifndef UDPC_CXX11_SHARED_SPIN_LOCK_H_
|
||||
#define UDPC_CXX11_SHARED_SPIN_LOCK_H_
|
||||
|
||||
#include <memory>
|
||||
#include <atomic>
|
||||
|
||||
namespace UDPC {
|
||||
|
||||
// Forward declaration for LockObj.
|
||||
class SharedSpinLock;
|
||||
|
||||
class Badge {
|
||||
public:
|
||||
static Badge newInvalid();
|
||||
|
||||
// Disallow copy.
|
||||
Badge(const Badge&) = delete;
|
||||
Badge& operator=(const Badge&) = delete;
|
||||
|
||||
// Allow move.
|
||||
Badge(Badge&&) = default;
|
||||
Badge& operator=(Badge&&) = default;
|
||||
|
||||
private:
|
||||
friend class SharedSpinLock;
|
||||
|
||||
// Can only be created by SharedSpinLock.
|
||||
Badge();
|
||||
|
||||
bool isValid;
|
||||
};
|
||||
|
||||
template <bool IsWriteObj>
|
||||
class LockObj {
|
||||
public:
|
||||
// Invalid instance constructor.
|
||||
LockObj();
|
||||
|
||||
~LockObj();
|
||||
|
||||
// Explicit invalid instance constructor.
|
||||
static LockObj<IsWriteObj> newInvalid();
|
||||
|
||||
// Disallow copy.
|
||||
LockObj(const LockObj&) = delete;
|
||||
LockObj& operator=(const LockObj&) = delete;
|
||||
|
||||
// Allow move.
|
||||
LockObj(LockObj&&) = default;
|
||||
LockObj& operator=(LockObj&&) = default;
|
||||
|
||||
bool isValid() const;
|
||||
|
||||
private:
|
||||
friend class SharedSpinLock;
|
||||
|
||||
// Only can be created by SharedSpinLock.
|
||||
LockObj(Badge &&badge);
|
||||
LockObj(std::weak_ptr<SharedSpinLock> lockPtr, Badge &&badge);
|
||||
|
||||
std::weak_ptr<SharedSpinLock> weakPtrLock;
|
||||
bool isLocked;
|
||||
Badge badge;
|
||||
};
|
||||
|
||||
class SharedSpinLock {
|
||||
public:
|
||||
using Ptr = std::shared_ptr<SharedSpinLock>;
|
||||
using Weak = std::weak_ptr<SharedSpinLock>;
|
||||
|
||||
static Ptr newInstance();
|
||||
|
||||
// Disallow copy.
|
||||
SharedSpinLock(const SharedSpinLock&) = delete;
|
||||
SharedSpinLock& operator=(const SharedSpinLock&) = delete;
|
||||
|
||||
// Disallow move.
|
||||
SharedSpinLock(SharedSpinLock&&) = delete;
|
||||
SharedSpinLock& operator=(SharedSpinLock&&) = delete;
|
||||
|
||||
LockObj<false> spin_read_lock();
|
||||
LockObj<false> try_spin_read_lock();
|
||||
void read_unlock(Badge&&);
|
||||
|
||||
LockObj<true> spin_write_lock();
|
||||
LockObj<true> try_spin_write_lock();
|
||||
void write_unlock(Badge&&);
|
||||
|
||||
LockObj<false> trade_write_for_read_lock(LockObj<true>&);
|
||||
LockObj<false> try_trade_write_for_read_lock(LockObj<true>&);
|
||||
|
||||
LockObj<true> trade_read_for_write_lock(LockObj<false>&);
|
||||
LockObj<true> try_trade_read_for_write_lock(LockObj<false>&);
|
||||
|
||||
private:
|
||||
SharedSpinLock();
|
||||
|
||||
Weak selfWeakPtr;
|
||||
|
||||
/// Used to lock the read/write member variables.
|
||||
volatile std::atomic_bool spinLock;
|
||||
|
||||
unsigned int read;
|
||||
bool write;
|
||||
|
||||
};
|
||||
|
||||
template <bool IsWriteObj>
|
||||
LockObj<IsWriteObj>::LockObj() :
|
||||
weakPtrLock(),
|
||||
isLocked(false),
|
||||
badge(UDPC::Badge::newInvalid())
|
||||
{}
|
||||
|
||||
template <bool IsWriteObj>
|
||||
LockObj<IsWriteObj>::LockObj(Badge &&badge) :
|
||||
weakPtrLock(),
|
||||
isLocked(false),
|
||||
badge(std::forward<Badge>(badge))
|
||||
{}
|
||||
|
||||
template <bool IsWriteObj>
|
||||
LockObj<IsWriteObj>::LockObj(SharedSpinLock::Weak lockPtr, Badge &&badge) :
|
||||
weakPtrLock(lockPtr),
|
||||
isLocked(true),
|
||||
badge(std::forward<Badge>(badge))
|
||||
{}
|
||||
|
||||
template <bool IsWriteObj>
|
||||
LockObj<IsWriteObj>::~LockObj() {
|
||||
if (!isLocked) {
|
||||
return;
|
||||
}
|
||||
auto strongPtrLock = weakPtrLock.lock();
|
||||
if (strongPtrLock) {
|
||||
if (IsWriteObj) {
|
||||
strongPtrLock->write_unlock(std::move(badge));
|
||||
} else {
|
||||
strongPtrLock->read_unlock(std::move(badge));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <bool IsWriteObj>
|
||||
LockObj<IsWriteObj> LockObj<IsWriteObj>::newInvalid() {
|
||||
return LockObj<IsWriteObj>{};
|
||||
}
|
||||
|
||||
template <bool IsWriteObj>
|
||||
bool LockObj<IsWriteObj>::isValid() const {
|
||||
return isLocked;
|
||||
}
|
||||
|
||||
} // namespace UDPC
|
||||
|
||||
#endif
|
122
src/TSLQueue.hpp
122
src/TSLQueue.hpp
|
@ -2,7 +2,6 @@
|
|||
#define UDPC_THREADSAFE_LINKEDLIST_QUEUE_HPP
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <optional>
|
||||
|
@ -10,6 +9,8 @@
|
|||
#include <list>
|
||||
#include <type_traits>
|
||||
|
||||
#include "CXX11_shared_spin_lock.hpp"
|
||||
|
||||
template <typename T>
|
||||
class TSLQueue {
|
||||
public:
|
||||
|
@ -62,7 +63,7 @@ class TSLQueue {
|
|||
|
||||
class TSLQIter {
|
||||
public:
|
||||
TSLQIter(std::mutex *mutex,
|
||||
TSLQIter(UDPC::SharedSpinLock::Weak sharedSpinLockWeak,
|
||||
std::weak_ptr<TSLQNode> currentNode,
|
||||
unsigned long *msize);
|
||||
~TSLQIter();
|
||||
|
@ -75,19 +76,24 @@ class TSLQueue {
|
|||
bool next();
|
||||
bool prev();
|
||||
bool remove();
|
||||
bool try_remove();
|
||||
|
||||
private:
|
||||
std::mutex *mutex;
|
||||
UDPC::SharedSpinLock::Weak sharedSpinLockWeak;
|
||||
std::unique_ptr<UDPC::LockObj<false>> readLock;
|
||||
std::unique_ptr<UDPC::LockObj<true>> writeLock;
|
||||
std::weak_ptr<TSLQNode> currentNode;
|
||||
unsigned long *const msize;
|
||||
|
||||
bool remove_impl();
|
||||
|
||||
};
|
||||
|
||||
public:
|
||||
TSLQIter begin();
|
||||
|
||||
private:
|
||||
std::mutex mutex;
|
||||
UDPC::SharedSpinLock::Ptr sharedSpinLock;
|
||||
std::shared_ptr<TSLQNode> head;
|
||||
std::shared_ptr<TSLQNode> tail;
|
||||
unsigned long msize;
|
||||
|
@ -95,7 +101,7 @@ class TSLQueue {
|
|||
|
||||
template <typename T>
|
||||
TSLQueue<T>::TSLQueue() :
|
||||
mutex(),
|
||||
sharedSpinLock(UDPC::SharedSpinLock::newInstance()),
|
||||
head(std::shared_ptr<TSLQNode>(new TSLQNode())),
|
||||
tail(std::shared_ptr<TSLQNode>(new TSLQNode())),
|
||||
msize(0)
|
||||
|
@ -111,9 +117,14 @@ TSLQueue<T>::~TSLQueue() {
|
|||
}
|
||||
|
||||
template <typename T>
|
||||
TSLQueue<T>::TSLQueue(TSLQueue &&other)
|
||||
TSLQueue<T>::TSLQueue(TSLQueue &&other) :
|
||||
sharedSpinLock(UDPC::SharedSpinLock::newInstance()),
|
||||
head(std::shared_ptr<TSLQNode>(new TSLQNode())),
|
||||
tail(std::shared_ptr<TSLQNode>(new TSLQNode())),
|
||||
msize(0)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(other.mutex);
|
||||
auto selfWriteLock = sharedSpinLock->spin_write_lock();
|
||||
auto otherWriteLock = other.sharedSpinLock->spin_write_lock();
|
||||
head = std::move(other.head);
|
||||
tail = std::move(other.tail);
|
||||
msize = std::move(other.msize);
|
||||
|
@ -121,8 +132,8 @@ TSLQueue<T>::TSLQueue(TSLQueue &&other)
|
|||
|
||||
template <typename T>
|
||||
TSLQueue<T> & TSLQueue<T>::operator=(TSLQueue &&other) {
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
std::lock_guard<std::mutex> otherLock(other.mutex);
|
||||
auto selfWriteLock = sharedSpinLock->spin_write_lock();
|
||||
auto otherWriteLock = other.sharedSpinLock->spin_write_lock();
|
||||
head = std::move(other.head);
|
||||
tail = std::move(other.tail);
|
||||
msize = std::move(other.msize);
|
||||
|
@ -130,7 +141,7 @@ TSLQueue<T> & TSLQueue<T>::operator=(TSLQueue &&other) {
|
|||
|
||||
template <typename T>
|
||||
void TSLQueue<T>::push(const T &data) {
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
auto writeLock = sharedSpinLock->spin_write_lock();
|
||||
auto newNode = std::shared_ptr<TSLQNode>(new TSLQNode());
|
||||
newNode->data = std::unique_ptr<T>(new T(data));
|
||||
|
||||
|
@ -146,7 +157,8 @@ void TSLQueue<T>::push(const T &data) {
|
|||
|
||||
template <typename T>
|
||||
bool TSLQueue<T>::push_nb(const T &data) {
|
||||
if(mutex.try_lock()) {
|
||||
auto writeLock = sharedSpinLock->try_spin_write_lock();
|
||||
if(writeLock.isValid()) {
|
||||
auto newNode = std::shared_ptr<TSLQNode>(new TSLQNode());
|
||||
newNode->data = std::unique_ptr<T>(new T(data));
|
||||
|
||||
|
@ -159,7 +171,6 @@ bool TSLQueue<T>::push_nb(const T &data) {
|
|||
tail->prev = newNode;
|
||||
++msize;
|
||||
|
||||
mutex.unlock();
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
|
@ -168,7 +179,7 @@ bool TSLQueue<T>::push_nb(const T &data) {
|
|||
|
||||
template <typename T>
|
||||
std::unique_ptr<T> TSLQueue<T>::top() {
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
auto readLock = sharedSpinLock->spin_read_lock();
|
||||
std::unique_ptr<T> result;
|
||||
if(head->next != tail) {
|
||||
assert(head->next->data);
|
||||
|
@ -181,20 +192,20 @@ std::unique_ptr<T> TSLQueue<T>::top() {
|
|||
template <typename T>
|
||||
std::unique_ptr<T> TSLQueue<T>::top_nb() {
|
||||
std::unique_ptr<T> result;
|
||||
if(mutex.try_lock()) {
|
||||
auto readLock = sharedSpinLock->try_spin_read_lock();
|
||||
if(readLock.isValid()) {
|
||||
if(head->next != tail) {
|
||||
assert(head->next->data);
|
||||
result = std::unique_ptr<T>(new T);
|
||||
*result = *head->next->data;
|
||||
}
|
||||
mutex.unlock();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool TSLQueue<T>::pop() {
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
auto writeLock = sharedSpinLock->spin_write_lock();
|
||||
if(head->next == tail) {
|
||||
return false;
|
||||
} else {
|
||||
|
@ -211,7 +222,7 @@ bool TSLQueue<T>::pop() {
|
|||
template <typename T>
|
||||
std::unique_ptr<T> TSLQueue<T>::top_and_pop() {
|
||||
std::unique_ptr<T> result;
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
auto writeLock = sharedSpinLock->spin_write_lock();
|
||||
if(head->next != tail) {
|
||||
assert(head->next->data);
|
||||
result = std::unique_ptr<T>(new T);
|
||||
|
@ -229,7 +240,7 @@ std::unique_ptr<T> TSLQueue<T>::top_and_pop() {
|
|||
template <typename T>
|
||||
std::unique_ptr<T> TSLQueue<T>::top_and_pop_and_empty(bool *isEmpty) {
|
||||
std::unique_ptr<T> result;
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
auto writeLock = sharedSpinLock->spin_write_lock();
|
||||
if(head->next == tail) {
|
||||
if(isEmpty) {
|
||||
*isEmpty = true;
|
||||
|
@ -255,7 +266,7 @@ std::unique_ptr<T> TSLQueue<T>::top_and_pop_and_empty(bool *isEmpty) {
|
|||
template <typename T>
|
||||
std::unique_ptr<T> TSLQueue<T>::top_and_pop_and_rsize(unsigned long *rsize) {
|
||||
std::unique_ptr<T> result;
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
auto writeLock = sharedSpinLock->spin_write_lock();
|
||||
if(head->next == tail) {
|
||||
if(rsize) {
|
||||
*rsize = 0;
|
||||
|
@ -280,7 +291,7 @@ std::unique_ptr<T> TSLQueue<T>::top_and_pop_and_rsize(unsigned long *rsize) {
|
|||
|
||||
template <typename T>
|
||||
void TSLQueue<T>::clear() {
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
auto writeLock = sharedSpinLock->spin_write_lock();
|
||||
|
||||
head->next = tail;
|
||||
tail->prev = head;
|
||||
|
@ -289,13 +300,13 @@ void TSLQueue<T>::clear() {
|
|||
|
||||
template <typename T>
|
||||
bool TSLQueue<T>::empty() {
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
auto readLock = sharedSpinLock->spin_read_lock();
|
||||
return head->next == tail;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
unsigned long TSLQueue<T>::size() {
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
auto readLock = sharedSpinLock->spin_read_lock();
|
||||
return msize;
|
||||
}
|
||||
|
||||
|
@ -313,20 +324,20 @@ bool TSLQueue<T>::TSLQNode::isNormal() const {
|
|||
}
|
||||
|
||||
template <typename T>
|
||||
TSLQueue<T>::TSLQIter::TSLQIter(std::mutex *mutex,
|
||||
TSLQueue<T>::TSLQIter::TSLQIter(UDPC::SharedSpinLock::Weak lockWeak,
|
||||
std::weak_ptr<TSLQNode> currentNode,
|
||||
unsigned long *msize) :
|
||||
mutex(mutex),
|
||||
sharedSpinLockWeak(lockWeak),
|
||||
readLock(std::unique_ptr<UDPC::LockObj<false>>(new UDPC::LockObj<false>{})),
|
||||
writeLock(),
|
||||
currentNode(currentNode),
|
||||
msize(msize)
|
||||
{
|
||||
mutex->lock();
|
||||
*readLock = lockWeak.lock()->spin_read_lock();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
TSLQueue<T>::TSLQIter::~TSLQIter() {
|
||||
mutex->unlock();
|
||||
}
|
||||
TSLQueue<T>::TSLQIter::~TSLQIter() {}
|
||||
|
||||
template <typename T>
|
||||
std::unique_ptr<T> TSLQueue<T>::TSLQIter::current() {
|
||||
|
@ -368,9 +379,61 @@ bool TSLQueue<T>::TSLQIter::prev() {
|
|||
|
||||
template <typename T>
|
||||
bool TSLQueue<T>::TSLQIter::remove() {
|
||||
if (readLock && !writeLock && readLock->isValid()) {
|
||||
auto sharedSpinLockStrong = sharedSpinLockWeak.lock();
|
||||
if (!sharedSpinLockStrong) {
|
||||
return false;
|
||||
}
|
||||
|
||||
writeLock = std::unique_ptr<UDPC::LockObj<true>>(new UDPC::LockObj<true>{});
|
||||
*writeLock = sharedSpinLockStrong->trade_read_for_write_lock(*readLock);
|
||||
readLock.reset(nullptr);
|
||||
|
||||
return remove_impl();
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool TSLQueue<T>::TSLQIter::try_remove() {
|
||||
if (readLock && !writeLock && readLock->isValid()) {
|
||||
auto sharedSpinLockStrong = sharedSpinLockWeak.lock();
|
||||
if (!sharedSpinLockStrong) {
|
||||
return false;
|
||||
}
|
||||
|
||||
writeLock = std::unique_ptr<UDPC::LockObj<true>>(new UDPC::LockObj<true>{});
|
||||
*writeLock = sharedSpinLockStrong->try_trade_read_for_write_lock(*readLock);
|
||||
if (writeLock->isValid()) {
|
||||
readLock.reset(nullptr);
|
||||
return remove_impl();
|
||||
} else {
|
||||
writeLock.reset(nullptr);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool TSLQueue<T>::TSLQIter::remove_impl() {
|
||||
const auto cleanupWriteLock = [this] () {
|
||||
UDPC::SharedSpinLock::Ptr sharedSpinLockStrong = this->sharedSpinLockWeak.lock();
|
||||
if (!sharedSpinLockStrong) {
|
||||
writeLock.reset(nullptr);
|
||||
return;
|
||||
}
|
||||
this->readLock = std::unique_ptr<UDPC::LockObj<false>>(new UDPC::LockObj<false>{});
|
||||
(*this->readLock) = sharedSpinLockStrong->trade_write_for_read_lock(*(this->writeLock));
|
||||
this->writeLock.reset(nullptr);
|
||||
};
|
||||
|
||||
std::shared_ptr<TSLQNode> currentNode = this->currentNode.lock();
|
||||
assert(currentNode);
|
||||
if(!currentNode->isNormal()) {
|
||||
cleanupWriteLock();
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -384,12 +447,13 @@ bool TSLQueue<T>::TSLQIter::remove() {
|
|||
assert(*msize > 0);
|
||||
--(*msize);
|
||||
|
||||
cleanupWriteLock();
|
||||
return parent->next->isNormal();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
typename TSLQueue<T>::TSLQIter TSLQueue<T>::begin() {
|
||||
return TSLQIter(&mutex, head->next, &msize);
|
||||
return TSLQIter(sharedSpinLock, head->next, &msize);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -225,13 +225,14 @@ public:
|
|||
|
||||
char recvBuf[UDPC_PACKET_MAX_SIZE];
|
||||
/*
|
||||
* 0 - is threaded
|
||||
* 0 - is destucting
|
||||
* 1 - is client
|
||||
* 2 - libsodium enabled
|
||||
*/
|
||||
std::bitset<8> flags;
|
||||
std::atomic_bool isAcceptNewConnections;
|
||||
std::atomic_bool isReceivingEvents;
|
||||
std::atomic_bool isAutoUpdating;
|
||||
std::atomic_uint32_t protocolID;
|
||||
std::atomic_uint_fast8_t loggingType;
|
||||
// See UDPC_AuthPolicy enum in UDPC.h for possible values
|
||||
|
@ -275,6 +276,9 @@ public:
|
|||
std::mutex atostrBufIndexMutex;
|
||||
std::uint32_t atostrBufIndex;
|
||||
|
||||
std::mutex setThreadedUpdateMutex;
|
||||
std::atomic_uint32_t enableDisableFuncRunningCount;
|
||||
|
||||
}; // struct Context
|
||||
|
||||
Context *verifyContext(UDPC_HContext ctx);
|
||||
|
|
|
@ -222,6 +222,7 @@ _contextIdentifier(UDPC_CONTEXT_IDENTIFIER),
|
|||
flags(),
|
||||
isAcceptNewConnections(true),
|
||||
isReceivingEvents(false),
|
||||
isAutoUpdating(false),
|
||||
protocolID(UDPC_DEFAULT_PROTOCOL_ID),
|
||||
#ifndef NDEBUG
|
||||
loggingType(UDPC_DEBUG),
|
||||
|
@ -256,16 +257,20 @@ peerPKWhitelistMutex(),
|
|||
threadedSleepTime(std::chrono::milliseconds(UDPC_UPDATE_MS_DEFAULT)),
|
||||
keysSet(),
|
||||
atostrBufIndexMutex(),
|
||||
atostrBufIndex(0)
|
||||
atostrBufIndex(0),
|
||||
setThreadedUpdateMutex(),
|
||||
enableDisableFuncRunningCount(0)
|
||||
{
|
||||
std::memset(atostrBuf, 0, UDPC_ATOSTR_SIZE);
|
||||
|
||||
if(isThreaded) {
|
||||
flags.set(0);
|
||||
isAutoUpdating.store(true);
|
||||
} else {
|
||||
flags.reset(0);
|
||||
isAutoUpdating.store(false);
|
||||
}
|
||||
|
||||
flags.reset(0);
|
||||
|
||||
rng_engine.seed(std::chrono::system_clock::now().time_since_epoch().count());
|
||||
|
||||
threadRunning.store(true);
|
||||
|
@ -2171,7 +2176,7 @@ UDPC_HContext UDPC_init_threaded_update(UDPC_ConnectionId listenId,
|
|||
return nullptr;
|
||||
}
|
||||
|
||||
ctx->flags.set(0);
|
||||
ctx->isAutoUpdating.store(true);
|
||||
ctx->threadedSleepTime = std::chrono::milliseconds(UDPC_UPDATE_MS_DEFAULT);
|
||||
ctx->thread = std::thread(UDPC::threadedUpdate, ctx);
|
||||
|
||||
|
@ -2189,7 +2194,7 @@ UDPC_HContext UDPC_init_threaded_update_ms(
|
|||
return nullptr;
|
||||
}
|
||||
|
||||
ctx->flags.set(0);
|
||||
ctx->isAutoUpdating.store(true);
|
||||
if(updateMS < UDPC_UPDATE_MS_MIN) {
|
||||
ctx->threadedSleepTime = std::chrono::milliseconds(UDPC_UPDATE_MS_MIN);
|
||||
} else if(updateMS > UDPC_UPDATE_MS_MAX) {
|
||||
|
@ -2206,26 +2211,47 @@ UDPC_HContext UDPC_init_threaded_update_ms(
|
|||
|
||||
int UDPC_enable_threaded_update(UDPC_HContext ctx) {
|
||||
UDPC::Context *c = UDPC::verifyContext(ctx);
|
||||
if(!c || c->flags.test(0) || c->thread.joinable()) {
|
||||
if (!c) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
c->flags.set(0);
|
||||
c->enableDisableFuncRunningCount.fetch_add(1);
|
||||
|
||||
std::lock_guard<std::mutex> setThreadedLock(c->setThreadedUpdateMutex);
|
||||
|
||||
if(c->flags.test(0) || c->isAutoUpdating.load() || c->thread.joinable()) {
|
||||
c->enableDisableFuncRunningCount.fetch_sub(1);
|
||||
return 0;
|
||||
}
|
||||
|
||||
c->isAutoUpdating.store(true);
|
||||
c->threadedSleepTime = std::chrono::milliseconds(UDPC_UPDATE_MS_DEFAULT);
|
||||
c->threadRunning.store(true);
|
||||
c->thread = std::thread(UDPC::threadedUpdate, c);
|
||||
|
||||
UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "Started threaded update");
|
||||
|
||||
c->enableDisableFuncRunningCount.fetch_sub(1);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int UDPC_enable_threaded_update_ms(UDPC_HContext ctx, int updateMS) {
|
||||
UDPC::Context *c = UDPC::verifyContext(ctx);
|
||||
if(!c || c->flags.test(0) || c->thread.joinable()) {
|
||||
if (!c) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
c->flags.set(0);
|
||||
c->enableDisableFuncRunningCount.fetch_add(1);
|
||||
|
||||
std::lock_guard<std::mutex> setThreadedLock(c->setThreadedUpdateMutex);
|
||||
|
||||
if(c->flags.test(0) || c->isAutoUpdating.load() || c->thread.joinable()) {
|
||||
c->enableDisableFuncRunningCount.fetch_sub(1);
|
||||
return 0;
|
||||
}
|
||||
|
||||
c->isAutoUpdating.store(true);
|
||||
if(updateMS < UDPC_UPDATE_MS_MIN) {
|
||||
c->threadedSleepTime = std::chrono::milliseconds(UDPC_UPDATE_MS_MIN);
|
||||
} else if(updateMS > UDPC_UPDATE_MS_MAX) {
|
||||
|
@ -2237,20 +2263,35 @@ int UDPC_enable_threaded_update_ms(UDPC_HContext ctx, int updateMS) {
|
|||
c->thread = std::thread(UDPC::threadedUpdate, c);
|
||||
|
||||
UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "Started threaded update");
|
||||
|
||||
c->enableDisableFuncRunningCount.fetch_sub(1);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int UDPC_disable_threaded_update(UDPC_HContext ctx) {
|
||||
UDPC::Context *c = UDPC::verifyContext(ctx);
|
||||
if(!c || !c->flags.test(0) || !c->thread.joinable()) {
|
||||
if (!c) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
c->enableDisableFuncRunningCount.fetch_add(1);
|
||||
|
||||
std::lock_guard<std::mutex> setThreadedLock(c->setThreadedUpdateMutex);
|
||||
|
||||
if(c->flags.test(0) || !c->isAutoUpdating.load() || !c->thread.joinable()) {
|
||||
c->enableDisableFuncRunningCount.fetch_sub(1);
|
||||
return 0;
|
||||
}
|
||||
|
||||
c->threadRunning.store(false);
|
||||
c->thread.join();
|
||||
c->flags.reset(0);
|
||||
c->isAutoUpdating.store(false);
|
||||
|
||||
UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "Stopped threaded update");
|
||||
|
||||
c->enableDisableFuncRunningCount.fetch_sub(1);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -2261,11 +2302,33 @@ int UDPC_is_valid_context(UDPC_HContext ctx) {
|
|||
void UDPC_destroy(UDPC_HContext ctx) {
|
||||
UDPC::Context *UDPC_ctx = UDPC::verifyContext(ctx);
|
||||
if(UDPC_ctx) {
|
||||
// stop thread if threaded
|
||||
if(UDPC_ctx->flags.test(0)) {
|
||||
{
|
||||
// Acquire lock so that this code does not run at the same time as
|
||||
// enabling/disabling threaded-update.
|
||||
std::lock_guard<std::mutex>
|
||||
setThreadedLock(UDPC_ctx->setThreadedUpdateMutex);
|
||||
|
||||
// Stop thread if threaded.
|
||||
// Set atomic bool to false always so that the thread will always
|
||||
// stop at this point.
|
||||
UDPC_ctx->threadRunning.store(false);
|
||||
if(UDPC_ctx->isAutoUpdating.load() && UDPC_ctx->thread.joinable()) {
|
||||
UDPC_ctx->thread.join();
|
||||
}
|
||||
UDPC_ctx->isAutoUpdating.store(false);
|
||||
|
||||
// Set destructing flag.
|
||||
UDPC_ctx->flags.set(0);
|
||||
|
||||
// Drop lock at this point before destructing the context.
|
||||
}
|
||||
|
||||
// After lock has been dropped, wait in case there are enable/disable
|
||||
// threaded update functions waiting on the lock. Do this via a
|
||||
// atomic-int-based spin-lock.
|
||||
while(UDPC_ctx->enableDisableFuncRunningCount.load() != 0) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
}
|
||||
|
||||
#if UDPC_PLATFORM == UDPC_PLATFORM_WINDOWS
|
||||
WSACleanup();
|
||||
|
@ -2277,7 +2340,7 @@ void UDPC_destroy(UDPC_HContext ctx) {
|
|||
|
||||
void UDPC_update(UDPC_HContext ctx) {
|
||||
UDPC::Context *c = UDPC::verifyContext(ctx);
|
||||
if(!c || c->flags.test(0)) {
|
||||
if(!c || c->isAutoUpdating.load()) {
|
||||
// invalid or is threaded, update should not be called
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -143,7 +143,8 @@ TEST(TSLQueue, Iterator) {
|
|||
// test that lock is held by iterator
|
||||
EXPECT_FALSE(q.push_nb(10));
|
||||
op = q.top_nb();
|
||||
EXPECT_FALSE(op);
|
||||
// Getting top and iterator both hold read locks so this should be true.
|
||||
EXPECT_TRUE(op);
|
||||
|
||||
// backwards iteration
|
||||
EXPECT_TRUE(iter.prev());
|
||||
|
@ -175,6 +176,21 @@ TEST(TSLQueue, Iterator) {
|
|||
op = iter.current();
|
||||
EXPECT_TRUE(op);
|
||||
EXPECT_EQ(*op, 2);
|
||||
|
||||
// second iterator
|
||||
auto iter2 = q.begin();
|
||||
|
||||
// Still should be able to get top.
|
||||
EXPECT_TRUE(iter2.current());
|
||||
|
||||
// Shouldn't be able to remove if 2 iterators exist.
|
||||
EXPECT_FALSE(iter2.try_remove());
|
||||
|
||||
// This will never return since the first iterator has a "read" lock.
|
||||
//EXPECT_FALSE(iter2.remove());
|
||||
|
||||
// Still should be able to get top.
|
||||
EXPECT_TRUE(iter2.current());
|
||||
}
|
||||
EXPECT_EQ(q.size(), 9);
|
||||
|
||||
|
|
|
@ -434,3 +434,29 @@ TEST(UDPC, free_packet_ptr) {
|
|||
UDPC_free_PacketInfo_ptr(&pinfo);
|
||||
UDPC_free_PacketInfo_ptr(nullptr);
|
||||
}
|
||||
|
||||
TEST(UDPC, enableDisableThreadedUpdate_StressTest) {
|
||||
UDPC_ConnectionId id = UDPC_create_id_anyaddr(0);
|
||||
UDPC_HContext ctx = UDPC_init(id, 0, 0);
|
||||
|
||||
std::array<std::thread, 100> thread_array;
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
if (i % 2 == 0) {
|
||||
thread_array[i] = std::thread([] (UDPC_HContext ctx) {
|
||||
UDPC_enable_threaded_update(ctx);
|
||||
}, ctx);
|
||||
} else {
|
||||
thread_array[i] = std::thread([] (UDPC_HContext ctx) {
|
||||
UDPC_disable_threaded_update(ctx);
|
||||
}, ctx);
|
||||
}
|
||||
}
|
||||
|
||||
thread_array[0].join();
|
||||
|
||||
UDPC_destroy(ctx);
|
||||
|
||||
for (int i = 1; i < 100; ++i) {
|
||||
thread_array[i].join();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue