From f10c53d92c1fc89197227ed235e507b2036d96cc Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Sun, 7 Jul 2019 14:13:16 +0900 Subject: [PATCH] Change TSQueue to be a wrapper around a queue --- .gitmodules | 3 + cpp_impl/CMakeLists.txt | 13 ++- cpp_impl/RingBuffer | 1 + cpp_impl/src/TSQueue.cpp | 184 ------------------------------ cpp_impl/src/TSQueue.hpp | 87 +++++++++++--- cpp_impl/src/test/TestTSQueue.cpp | 58 ++++------ 6 files changed, 113 insertions(+), 233 deletions(-) create mode 100644 .gitmodules create mode 160000 cpp_impl/RingBuffer delete mode 100644 cpp_impl/src/TSQueue.cpp diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..01f3ed5 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "cpp_impl/RingBuffer"] + path = cpp_impl/RingBuffer + url = https://github.com/Stephen-Seo/RingBuffer.git diff --git a/cpp_impl/CMakeLists.txt b/cpp_impl/CMakeLists.txt index 8923206..377f556 100644 --- a/cpp_impl/CMakeLists.txt +++ b/cpp_impl/CMakeLists.txt @@ -3,9 +3,14 @@ project(UDPConnection) set(UDPConnection_VERSION 1.0) +if(NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/RingBuffer/src) + message(FATAL_ERROR "RingBuffer is missing!\nPlease update the \ +RingBuffer submodule by running 'git submodule init' and 'git submodule \ +update'!") +endif() + set(UDPConnection_SOURCES src/UDPConnection.cpp - src/TSQueue.cpp ) set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wpedantic -Wno-missing-braces") @@ -22,9 +27,12 @@ add_library(UDPConnection ${UDPConnection_SOURCES}) set_target_properties(UDPConnection PROPERTIES VERSION ${UDPConnection_VERSION}) -target_compile_features(UDPConnection PUBLIC cxx_std_11) +target_compile_features(UDPConnection PUBLIC cxx_std_17) target_link_libraries(UDPConnection PUBLIC pthread) +target_include_directories(UDPConnection PUBLIC + "${CMAKE_CURRENT_SOURCE_DIR}/RingBuffer/src") + if(CMAKE_BUILD_TYPE MATCHES "Debug") find_package(GTest QUIET) @@ -34,6 +42,7 @@ if(CMAKE_BUILD_TYPE MATCHES "Debug") src/test/TestTSQueue.cpp ) add_executable(UnitTest ${UDPC_UnitTest_SOURCES}) + target_compile_features(UnitTest PUBLIC cxx_std_17) target_link_libraries(UnitTest PUBLIC UDPConnection ${GTEST_BOTH_LIBRARIES}) target_include_directories(UnitTest PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/src) endif() diff --git a/cpp_impl/RingBuffer b/cpp_impl/RingBuffer new file mode 160000 index 0000000..2873bfb --- /dev/null +++ b/cpp_impl/RingBuffer @@ -0,0 +1 @@ +Subproject commit 2873bfb55467a9b236382b77db18408a661c4542 diff --git a/cpp_impl/src/TSQueue.cpp b/cpp_impl/src/TSQueue.cpp deleted file mode 100644 index 39c342e..0000000 --- a/cpp_impl/src/TSQueue.cpp +++ /dev/null @@ -1,184 +0,0 @@ -#include "TSQueue.hpp" - -#include - -TSQueue::TSQueue(unsigned int elemSize, unsigned int capacity) - : elemSize(elemSize), head(0), tail(0), isEmpty(true), - spinLock(false) { - if (elemSize == 0) { - this->elemSize = 1; - } - if (capacity == 0) { - this->capacityBytes = UDPC_TSQUEUE_DEFAULT_CAPACITY * this->elemSize; - } else { - this->capacityBytes = capacity * this->elemSize; - } - - this->buffer = - std::unique_ptr(new unsigned char[this->capacityBytes]); -} - -TSQueue::~TSQueue() {} - -bool TSQueue::push(void *data) { - while (spinLock.exchange(true) == true) { - } - if (!isEmpty && head == tail) { - spinLock.store(false); - return false; - } - - memcpy(buffer.get() + tail, data, elemSize); - tail = (tail + elemSize) % capacityBytes; - - isEmpty = false; - - spinLock.store(false); - return true; -} - -std::unique_ptr TSQueue::top() { - while (spinLock.exchange(true) == true) { - } - if (isEmpty) { - spinLock.store(false); - return std::unique_ptr(); - } - - auto data = std::unique_ptr(new unsigned char[elemSize]); - memcpy(data.get(), buffer.get() + head, elemSize); - spinLock.store(false); - return data; -} - -bool TSQueue::pop() { - while (spinLock.exchange(true) == true) { - } - if (isEmpty) { - spinLock.store(false); - return false; - } - head += elemSize; - if (head >= capacityBytes) { - head = 0; - } - if (head == tail) { - isEmpty = true; - } - spinLock.store(false); - return true; -} - -void TSQueue::clear() { - while (spinLock.exchange(true) == true) { - } - - head = 0; - tail = 0; - isEmpty = 0; - - spinLock.store(false); -} - -void TSQueue::changeCapacity(unsigned int newCapacity) { - if (newCapacity == 0) { - return; - } - while (spinLock.exchange(true) == true) { - } - - // repeat of sizeBytes() to avoid deadlock - unsigned int size; - if (head == tail) { - size = capacityBytes; - } else if (head < tail) { - size = tail - head; - } else { - size = capacityBytes - head + tail; - } - - unsigned int newCap = newCapacity * elemSize; - auto newBuffer = - std::unique_ptr(new unsigned char[newCap]); - - if (!isEmpty) { - unsigned int tempHead = head; - if (size > newCap) { - unsigned int diff = size - newCap; - tempHead = (head + diff) % capacityBytes; - } - if (tempHead < tail) { - memcpy(newBuffer.get(), buffer.get() + tempHead, tail - tempHead); - } else { - memcpy(newBuffer.get(), buffer.get() + tempHead, - capacityBytes - tempHead); - if (tail != 0) { - memcpy(newBuffer.get() + capacityBytes - tempHead, buffer.get(), - tail); - } - } - } - - if (size < newCap) { - if (head < tail) { - tail = tail - head; - head = 0; - } else { - tail = capacityBytes - head + tail; - head = 0; - } - } else { - head = 0; - tail = 0; - isEmpty = false; - } - buffer = std::move(newBuffer); - capacityBytes = newCap; - - spinLock.store(false); -} - -unsigned int TSQueue::size() { - while (spinLock.exchange(true) == true) { - } - - if (isEmpty) { - spinLock.store(false); - return 0; - } - - unsigned int size; - if (head == tail) { - size = capacityBytes; - } else if (head < tail) { - size = tail - head; - } else { - size = capacityBytes - head + tail; - } - size /= elemSize; - - spinLock.store(false); - return size; -} - -unsigned int TSQueue::sizeBytes() { - while (spinLock.exchange(true) == true) { - } - - if (isEmpty) { - spinLock.store(false); - return 0; - } - - unsigned int size; - if (head == tail) { - size = capacityBytes; - } else if (head < tail) { - size = tail - head; - } else { - size = capacityBytes - head + tail; - } - - spinLock.store(false); - return size; -} diff --git a/cpp_impl/src/TSQueue.hpp b/cpp_impl/src/TSQueue.hpp index 0a5e616..badb7d2 100644 --- a/cpp_impl/src/TSQueue.hpp +++ b/cpp_impl/src/TSQueue.hpp @@ -7,12 +7,12 @@ #include #include +#include + +template class TSQueue { public: - typedef std::unique_ptr TopType; - - TSQueue(unsigned int elemSize, - unsigned int capacity = UDPC_TSQUEUE_DEFAULT_CAPACITY); + TSQueue(unsigned int capacity = UDPC_TSQUEUE_DEFAULT_CAPACITY); ~TSQueue(); // disable copy @@ -22,23 +22,82 @@ class TSQueue { TSQueue(TSQueue &&other) = delete; TSQueue &operator=(TSQueue &&other) = delete; - bool push(void *data); - TopType top(); + bool push(const T &data); + T top(); bool pop(); void clear(); void changeCapacity(unsigned int newCapacity); unsigned int size(); private: - unsigned int elemSize; - unsigned int capacityBytes; - unsigned int head; - unsigned int tail; - bool isEmpty; - std::unique_ptr buffer; std::atomic_bool spinLock; - - unsigned int sizeBytes(); + RB::RingBuffer rb; }; +template +TSQueue::TSQueue(unsigned int capacity) : +spinLock(false), +rb(capacity) +{ + rb.setResizePolicy(false); +} + +template +TSQueue::~TSQueue() +{} + +template +bool TSQueue::push(const T &data) { + while(spinLock.exchange(true)) {} + if(rb.getSize() == rb.getCapacity()) { + spinLock.store(false); + return false; + } + rb.push(data); + spinLock.store(false); + return true; +} + +template +T TSQueue::top() { + while(spinLock.exchange(true)) {} + T value = rb.top(); + spinLock.store(false); + return value; +} + +template +bool TSQueue::pop() { + while(spinLock.exchange(true)) {} + if(rb.empty()) { + spinLock.store(false); + return false; + } + rb.pop(); + spinLock.store(false); + return true; +} + +template +void TSQueue::clear() { + while(spinLock.exchange(true)) {} + rb.resize(0); + spinLock.store(false); +} + +template +void TSQueue::changeCapacity(unsigned int newCapacity) { + while(spinLock.exchange(true)) {} + rb.changeCapacity(newCapacity); + spinLock.store(false); +} + +template +unsigned int TSQueue::size() { + while(spinLock.exchange(true)) {} + unsigned int size = rb.getSize(); + spinLock.store(false); + return size; +} + #endif diff --git a/cpp_impl/src/test/TestTSQueue.cpp b/cpp_impl/src/test/TestTSQueue.cpp index f1adeea..58e2a7d 100644 --- a/cpp_impl/src/test/TestTSQueue.cpp +++ b/cpp_impl/src/test/TestTSQueue.cpp @@ -7,92 +7,85 @@ TEST(TSQueue, Usage) { - TSQueue q(sizeof(int), 4); + TSQueue q(4); int temp = 100; EXPECT_EQ(q.size(), 0); EXPECT_FALSE(q.pop()); - EXPECT_TRUE(q.push(&temp)); + EXPECT_TRUE(q.push(temp)); EXPECT_EQ(q.size(), 1); // { 100 } temp = 200; - EXPECT_TRUE(q.push(&temp)); + EXPECT_TRUE(q.push(temp)); EXPECT_EQ(q.size(), 2); - auto top = q.top(); - EXPECT_EQ(100, *((int*)top.get())); + EXPECT_EQ(100, q.top()); // { 100, 200 } temp = 300; - EXPECT_TRUE(q.push(&temp)); + EXPECT_TRUE(q.push(temp)); EXPECT_EQ(q.size(), 3); // { 100, 200, 300 } temp = 400; - EXPECT_TRUE(q.push(&temp)); + EXPECT_TRUE(q.push(temp)); EXPECT_EQ(q.size(), 4); // { 100, 200, 300, 400 } temp = 500; - EXPECT_FALSE(q.push(&temp)); + EXPECT_FALSE(q.push(temp)); EXPECT_EQ(q.size(), 4); - top = q.top(); - EXPECT_EQ(100, *((int*)top.get())); + EXPECT_EQ(100, q.top()); EXPECT_TRUE(q.pop()); EXPECT_EQ(q.size(), 3); // { 200, 300, 400 } - top = q.top(); - EXPECT_EQ(200, *((int*)top.get())); + EXPECT_EQ(200, q.top()); temp = 1; - EXPECT_TRUE(q.push(&temp)); + EXPECT_TRUE(q.push(temp)); EXPECT_EQ(q.size(), 4); // { 200, 300, 400, 1 } - top = q.top(); - EXPECT_EQ(200, *((int*)top.get())); + EXPECT_EQ(200, q.top()); temp = 2; - EXPECT_FALSE(q.push(&temp)); + EXPECT_FALSE(q.push(temp)); EXPECT_EQ(q.size(), 4); q.changeCapacity(8); EXPECT_EQ(q.size(), 4); temp = 10; - EXPECT_TRUE(q.push(&temp)); + EXPECT_TRUE(q.push(temp)); EXPECT_EQ(q.size(), 5); // { 200, 300, 400, 1, 10 } - top = q.top(); - EXPECT_EQ(200, *((int*)top.get())); + EXPECT_EQ(200, q.top()); EXPECT_TRUE(q.pop()); EXPECT_EQ(q.size(), 4); // { 300, 400, 1, 10 } - top = q.top(); - EXPECT_EQ(300, *((int*)top.get())); + EXPECT_EQ(300, q.top()); EXPECT_TRUE(q.pop()); EXPECT_EQ(q.size(), 3); // { 400, 1, 10 } - top = q.top(); - EXPECT_EQ(400, *((int*)top.get())); + EXPECT_EQ(400, q.top()); q.changeCapacity(1); @@ -100,8 +93,7 @@ TEST(TSQueue, Usage) EXPECT_EQ(q.size(), 1); - top = q.top(); - EXPECT_EQ(10, *((int*)top.get())); + EXPECT_EQ(10, q.top()); EXPECT_TRUE(q.pop()); @@ -113,13 +105,13 @@ TEST(TSQueue, Usage) TEST(TSQueue, Concurrent) { - TSQueue q(sizeof(int), 4); + TSQueue q(4); - auto a0 = std::async(std::launch::async, [&q] () {int i = 0; return q.push(&i); }); - auto a1 = std::async(std::launch::async, [&q] () {int i = 1; return q.push(&i); }); - auto a2 = std::async(std::launch::async, [&q] () {int i = 2; return q.push(&i); }); - auto a3 = std::async(std::launch::async, [&q] () {int i = 3; return q.push(&i); }); - auto a4 = std::async(std::launch::async, [&q] () {int i = 4; return q.push(&i); }); + auto a0 = std::async(std::launch::async, [&q] () {int i = 0; return q.push(i); }); + auto a1 = std::async(std::launch::async, [&q] () {int i = 1; return q.push(i); }); + auto a2 = std::async(std::launch::async, [&q] () {int i = 2; return q.push(i); }); + auto a3 = std::async(std::launch::async, [&q] () {int i = 3; return q.push(i); }); + auto a4 = std::async(std::launch::async, [&q] () {int i = 4; return q.push(i); }); bool results[] = { a0.get(), @@ -139,12 +131,12 @@ TEST(TSQueue, Concurrent) EXPECT_EQ(insertCount, 4); EXPECT_EQ(q.size(), 4); - TSQueue::TopType top; + int top; for(int i = 0; i < 4; ++i) { top = q.top(); EXPECT_TRUE(q.pop()); EXPECT_EQ(q.size(), 3 - i); - printf("%d ", *((int*)top.get())); + printf("%d ", top); } printf("\n");