Change TSQueue to be a wrapper around a queue

This commit is contained in:
Stephen Seo 2019-07-07 14:13:16 +09:00
parent 973e71ead0
commit f10c53d92c
6 changed files with 113 additions and 233 deletions

3
.gitmodules vendored Normal file
View file

@ -0,0 +1,3 @@
[submodule "cpp_impl/RingBuffer"]
path = cpp_impl/RingBuffer
url = https://github.com/Stephen-Seo/RingBuffer.git

View file

@ -3,9 +3,14 @@ project(UDPConnection)
set(UDPConnection_VERSION 1.0) set(UDPConnection_VERSION 1.0)
if(NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/RingBuffer/src)
message(FATAL_ERROR "RingBuffer is missing!\nPlease update the \
RingBuffer submodule by running 'git submodule init' and 'git submodule \
update'!")
endif()
set(UDPConnection_SOURCES set(UDPConnection_SOURCES
src/UDPConnection.cpp src/UDPConnection.cpp
src/TSQueue.cpp
) )
set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wpedantic -Wno-missing-braces") set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wpedantic -Wno-missing-braces")
@ -22,9 +27,12 @@ add_library(UDPConnection ${UDPConnection_SOURCES})
set_target_properties(UDPConnection PROPERTIES VERSION ${UDPConnection_VERSION}) set_target_properties(UDPConnection PROPERTIES VERSION ${UDPConnection_VERSION})
target_compile_features(UDPConnection PUBLIC cxx_std_11) target_compile_features(UDPConnection PUBLIC cxx_std_17)
target_link_libraries(UDPConnection PUBLIC pthread) target_link_libraries(UDPConnection PUBLIC pthread)
target_include_directories(UDPConnection PUBLIC
"${CMAKE_CURRENT_SOURCE_DIR}/RingBuffer/src")
if(CMAKE_BUILD_TYPE MATCHES "Debug") if(CMAKE_BUILD_TYPE MATCHES "Debug")
find_package(GTest QUIET) find_package(GTest QUIET)
@ -34,6 +42,7 @@ if(CMAKE_BUILD_TYPE MATCHES "Debug")
src/test/TestTSQueue.cpp src/test/TestTSQueue.cpp
) )
add_executable(UnitTest ${UDPC_UnitTest_SOURCES}) add_executable(UnitTest ${UDPC_UnitTest_SOURCES})
target_compile_features(UnitTest PUBLIC cxx_std_17)
target_link_libraries(UnitTest PUBLIC UDPConnection ${GTEST_BOTH_LIBRARIES}) target_link_libraries(UnitTest PUBLIC UDPConnection ${GTEST_BOTH_LIBRARIES})
target_include_directories(UnitTest PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/src) target_include_directories(UnitTest PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/src)
endif() endif()

1
cpp_impl/RingBuffer Submodule

@ -0,0 +1 @@
Subproject commit 2873bfb55467a9b236382b77db18408a661c4542

View file

@ -1,184 +0,0 @@
#include "TSQueue.hpp"
#include <cstring>
TSQueue::TSQueue(unsigned int elemSize, unsigned int capacity)
: elemSize(elemSize), head(0), tail(0), isEmpty(true),
spinLock(false) {
if (elemSize == 0) {
this->elemSize = 1;
}
if (capacity == 0) {
this->capacityBytes = UDPC_TSQUEUE_DEFAULT_CAPACITY * this->elemSize;
} else {
this->capacityBytes = capacity * this->elemSize;
}
this->buffer =
std::unique_ptr<unsigned char[]>(new unsigned char[this->capacityBytes]);
}
TSQueue::~TSQueue() {}
bool TSQueue::push(void *data) {
while (spinLock.exchange(true) == true) {
}
if (!isEmpty && head == tail) {
spinLock.store(false);
return false;
}
memcpy(buffer.get() + tail, data, elemSize);
tail = (tail + elemSize) % capacityBytes;
isEmpty = false;
spinLock.store(false);
return true;
}
std::unique_ptr<unsigned char[]> TSQueue::top() {
while (spinLock.exchange(true) == true) {
}
if (isEmpty) {
spinLock.store(false);
return std::unique_ptr<unsigned char[]>();
}
auto data = std::unique_ptr<unsigned char[]>(new unsigned char[elemSize]);
memcpy(data.get(), buffer.get() + head, elemSize);
spinLock.store(false);
return data;
}
bool TSQueue::pop() {
while (spinLock.exchange(true) == true) {
}
if (isEmpty) {
spinLock.store(false);
return false;
}
head += elemSize;
if (head >= capacityBytes) {
head = 0;
}
if (head == tail) {
isEmpty = true;
}
spinLock.store(false);
return true;
}
void TSQueue::clear() {
while (spinLock.exchange(true) == true) {
}
head = 0;
tail = 0;
isEmpty = 0;
spinLock.store(false);
}
void TSQueue::changeCapacity(unsigned int newCapacity) {
if (newCapacity == 0) {
return;
}
while (spinLock.exchange(true) == true) {
}
// repeat of sizeBytes() to avoid deadlock
unsigned int size;
if (head == tail) {
size = capacityBytes;
} else if (head < tail) {
size = tail - head;
} else {
size = capacityBytes - head + tail;
}
unsigned int newCap = newCapacity * elemSize;
auto newBuffer =
std::unique_ptr<unsigned char[]>(new unsigned char[newCap]);
if (!isEmpty) {
unsigned int tempHead = head;
if (size > newCap) {
unsigned int diff = size - newCap;
tempHead = (head + diff) % capacityBytes;
}
if (tempHead < tail) {
memcpy(newBuffer.get(), buffer.get() + tempHead, tail - tempHead);
} else {
memcpy(newBuffer.get(), buffer.get() + tempHead,
capacityBytes - tempHead);
if (tail != 0) {
memcpy(newBuffer.get() + capacityBytes - tempHead, buffer.get(),
tail);
}
}
}
if (size < newCap) {
if (head < tail) {
tail = tail - head;
head = 0;
} else {
tail = capacityBytes - head + tail;
head = 0;
}
} else {
head = 0;
tail = 0;
isEmpty = false;
}
buffer = std::move(newBuffer);
capacityBytes = newCap;
spinLock.store(false);
}
unsigned int TSQueue::size() {
while (spinLock.exchange(true) == true) {
}
if (isEmpty) {
spinLock.store(false);
return 0;
}
unsigned int size;
if (head == tail) {
size = capacityBytes;
} else if (head < tail) {
size = tail - head;
} else {
size = capacityBytes - head + tail;
}
size /= elemSize;
spinLock.store(false);
return size;
}
unsigned int TSQueue::sizeBytes() {
while (spinLock.exchange(true) == true) {
}
if (isEmpty) {
spinLock.store(false);
return 0;
}
unsigned int size;
if (head == tail) {
size = capacityBytes;
} else if (head < tail) {
size = tail - head;
} else {
size = capacityBytes - head + tail;
}
spinLock.store(false);
return size;
}

View file

@ -7,12 +7,12 @@
#include <cstdlib> #include <cstdlib>
#include <memory> #include <memory>
#include <RB/RingBuffer.hpp>
template <typename T>
class TSQueue { class TSQueue {
public: public:
typedef std::unique_ptr<unsigned char[]> TopType; TSQueue(unsigned int capacity = UDPC_TSQUEUE_DEFAULT_CAPACITY);
TSQueue(unsigned int elemSize,
unsigned int capacity = UDPC_TSQUEUE_DEFAULT_CAPACITY);
~TSQueue(); ~TSQueue();
// disable copy // disable copy
@ -22,23 +22,82 @@ class TSQueue {
TSQueue(TSQueue &&other) = delete; TSQueue(TSQueue &&other) = delete;
TSQueue &operator=(TSQueue &&other) = delete; TSQueue &operator=(TSQueue &&other) = delete;
bool push(void *data); bool push(const T &data);
TopType top(); T top();
bool pop(); bool pop();
void clear(); void clear();
void changeCapacity(unsigned int newCapacity); void changeCapacity(unsigned int newCapacity);
unsigned int size(); unsigned int size();
private: private:
unsigned int elemSize;
unsigned int capacityBytes;
unsigned int head;
unsigned int tail;
bool isEmpty;
std::unique_ptr<unsigned char[]> buffer;
std::atomic_bool spinLock; std::atomic_bool spinLock;
RB::RingBuffer<T> rb;
unsigned int sizeBytes();
}; };
template <typename T>
TSQueue<T>::TSQueue(unsigned int capacity) :
spinLock(false),
rb(capacity)
{
rb.setResizePolicy(false);
}
template <typename T>
TSQueue<T>::~TSQueue()
{}
template <typename T>
bool TSQueue<T>::push(const T &data) {
while(spinLock.exchange(true)) {}
if(rb.getSize() == rb.getCapacity()) {
spinLock.store(false);
return false;
}
rb.push(data);
spinLock.store(false);
return true;
}
template <typename T>
T TSQueue<T>::top() {
while(spinLock.exchange(true)) {}
T value = rb.top();
spinLock.store(false);
return value;
}
template <typename T>
bool TSQueue<T>::pop() {
while(spinLock.exchange(true)) {}
if(rb.empty()) {
spinLock.store(false);
return false;
}
rb.pop();
spinLock.store(false);
return true;
}
template <typename T>
void TSQueue<T>::clear() {
while(spinLock.exchange(true)) {}
rb.resize(0);
spinLock.store(false);
}
template <typename T>
void TSQueue<T>::changeCapacity(unsigned int newCapacity) {
while(spinLock.exchange(true)) {}
rb.changeCapacity(newCapacity);
spinLock.store(false);
}
template <typename T>
unsigned int TSQueue<T>::size() {
while(spinLock.exchange(true)) {}
unsigned int size = rb.getSize();
spinLock.store(false);
return size;
}
#endif #endif

View file

@ -7,92 +7,85 @@
TEST(TSQueue, Usage) TEST(TSQueue, Usage)
{ {
TSQueue q(sizeof(int), 4); TSQueue<int> q(4);
int temp = 100; int temp = 100;
EXPECT_EQ(q.size(), 0); EXPECT_EQ(q.size(), 0);
EXPECT_FALSE(q.pop()); EXPECT_FALSE(q.pop());
EXPECT_TRUE(q.push(&temp)); EXPECT_TRUE(q.push(temp));
EXPECT_EQ(q.size(), 1); EXPECT_EQ(q.size(), 1);
// { 100 } // { 100 }
temp = 200; temp = 200;
EXPECT_TRUE(q.push(&temp)); EXPECT_TRUE(q.push(temp));
EXPECT_EQ(q.size(), 2); EXPECT_EQ(q.size(), 2);
auto top = q.top(); EXPECT_EQ(100, q.top());
EXPECT_EQ(100, *((int*)top.get()));
// { 100, 200 } // { 100, 200 }
temp = 300; temp = 300;
EXPECT_TRUE(q.push(&temp)); EXPECT_TRUE(q.push(temp));
EXPECT_EQ(q.size(), 3); EXPECT_EQ(q.size(), 3);
// { 100, 200, 300 } // { 100, 200, 300 }
temp = 400; temp = 400;
EXPECT_TRUE(q.push(&temp)); EXPECT_TRUE(q.push(temp));
EXPECT_EQ(q.size(), 4); EXPECT_EQ(q.size(), 4);
// { 100, 200, 300, 400 } // { 100, 200, 300, 400 }
temp = 500; temp = 500;
EXPECT_FALSE(q.push(&temp)); EXPECT_FALSE(q.push(temp));
EXPECT_EQ(q.size(), 4); EXPECT_EQ(q.size(), 4);
top = q.top(); EXPECT_EQ(100, q.top());
EXPECT_EQ(100, *((int*)top.get()));
EXPECT_TRUE(q.pop()); EXPECT_TRUE(q.pop());
EXPECT_EQ(q.size(), 3); EXPECT_EQ(q.size(), 3);
// { 200, 300, 400 } // { 200, 300, 400 }
top = q.top(); EXPECT_EQ(200, q.top());
EXPECT_EQ(200, *((int*)top.get()));
temp = 1; temp = 1;
EXPECT_TRUE(q.push(&temp)); EXPECT_TRUE(q.push(temp));
EXPECT_EQ(q.size(), 4); EXPECT_EQ(q.size(), 4);
// { 200, 300, 400, 1 } // { 200, 300, 400, 1 }
top = q.top(); EXPECT_EQ(200, q.top());
EXPECT_EQ(200, *((int*)top.get()));
temp = 2; temp = 2;
EXPECT_FALSE(q.push(&temp)); EXPECT_FALSE(q.push(temp));
EXPECT_EQ(q.size(), 4); EXPECT_EQ(q.size(), 4);
q.changeCapacity(8); q.changeCapacity(8);
EXPECT_EQ(q.size(), 4); EXPECT_EQ(q.size(), 4);
temp = 10; temp = 10;
EXPECT_TRUE(q.push(&temp)); EXPECT_TRUE(q.push(temp));
EXPECT_EQ(q.size(), 5); EXPECT_EQ(q.size(), 5);
// { 200, 300, 400, 1, 10 } // { 200, 300, 400, 1, 10 }
top = q.top(); EXPECT_EQ(200, q.top());
EXPECT_EQ(200, *((int*)top.get()));
EXPECT_TRUE(q.pop()); EXPECT_TRUE(q.pop());
EXPECT_EQ(q.size(), 4); EXPECT_EQ(q.size(), 4);
// { 300, 400, 1, 10 } // { 300, 400, 1, 10 }
top = q.top(); EXPECT_EQ(300, q.top());
EXPECT_EQ(300, *((int*)top.get()));
EXPECT_TRUE(q.pop()); EXPECT_TRUE(q.pop());
EXPECT_EQ(q.size(), 3); EXPECT_EQ(q.size(), 3);
// { 400, 1, 10 } // { 400, 1, 10 }
top = q.top(); EXPECT_EQ(400, q.top());
EXPECT_EQ(400, *((int*)top.get()));
q.changeCapacity(1); q.changeCapacity(1);
@ -100,8 +93,7 @@ TEST(TSQueue, Usage)
EXPECT_EQ(q.size(), 1); EXPECT_EQ(q.size(), 1);
top = q.top(); EXPECT_EQ(10, q.top());
EXPECT_EQ(10, *((int*)top.get()));
EXPECT_TRUE(q.pop()); EXPECT_TRUE(q.pop());
@ -113,13 +105,13 @@ TEST(TSQueue, Usage)
TEST(TSQueue, Concurrent) TEST(TSQueue, Concurrent)
{ {
TSQueue q(sizeof(int), 4); TSQueue<int> q(4);
auto a0 = std::async(std::launch::async, [&q] () {int i = 0; return q.push(&i); }); auto a0 = std::async(std::launch::async, [&q] () {int i = 0; return q.push(i); });
auto a1 = std::async(std::launch::async, [&q] () {int i = 1; return q.push(&i); }); auto a1 = std::async(std::launch::async, [&q] () {int i = 1; return q.push(i); });
auto a2 = std::async(std::launch::async, [&q] () {int i = 2; return q.push(&i); }); auto a2 = std::async(std::launch::async, [&q] () {int i = 2; return q.push(i); });
auto a3 = std::async(std::launch::async, [&q] () {int i = 3; return q.push(&i); }); auto a3 = std::async(std::launch::async, [&q] () {int i = 3; return q.push(i); });
auto a4 = std::async(std::launch::async, [&q] () {int i = 4; return q.push(&i); }); auto a4 = std::async(std::launch::async, [&q] () {int i = 4; return q.push(i); });
bool results[] = { bool results[] = {
a0.get(), a0.get(),
@ -139,12 +131,12 @@ TEST(TSQueue, Concurrent)
EXPECT_EQ(insertCount, 4); EXPECT_EQ(insertCount, 4);
EXPECT_EQ(q.size(), 4); EXPECT_EQ(q.size(), 4);
TSQueue::TopType top; int top;
for(int i = 0; i < 4; ++i) { for(int i = 0; i < 4; ++i) {
top = q.top(); top = q.top();
EXPECT_TRUE(q.pop()); EXPECT_TRUE(q.pop());
EXPECT_EQ(q.size(), 3 - i); EXPECT_EQ(q.size(), 3 - i);
printf("%d ", *((int*)top.get())); printf("%d ", top);
} }
printf("\n"); printf("\n");