Create Thread-Safe-Queue (untested)
This commit is contained in:
parent
4b56bda4b1
commit
2aa4600c57
4 changed files with 111 additions and 0 deletions
1
cpp_impl/.gitignore
vendored
1
cpp_impl/.gitignore
vendored
|
@ -1,3 +1,4 @@
|
|||
build*/
|
||||
.clangd/
|
||||
compile_commands.json
|
||||
*.o
|
||||
|
|
|
@ -5,6 +5,7 @@ set(UDPConnection_VERSION 1.0)
|
|||
|
||||
set(UDPConnection_SOURCES
|
||||
src/UDPConnection.cpp
|
||||
src/TSQueue.cpp
|
||||
)
|
||||
|
||||
set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wpedantic -Wno-missing-braces")
|
||||
|
|
72
cpp_impl/src/TSQueue.cpp
Normal file
72
cpp_impl/src/TSQueue.cpp
Normal file
|
@ -0,0 +1,72 @@
|
|||
#include "TSQueue.hpp"
|
||||
|
||||
#include <cstring>
|
||||
|
||||
TSQueue::TSQueue(unsigned int elemSize, unsigned int capacity)
|
||||
: elemSize(elemSize), capacity(capacity), head(0), tail(0), isEmpty(true),
|
||||
spinLock(false) {
|
||||
if (elemSize == 0) {
|
||||
this->elemSize = 1;
|
||||
}
|
||||
if (capacity == 0) {
|
||||
this->capacity = UDPC_TSQUEUE_DEFAULT_CAPACITY * this->elemSize;
|
||||
}
|
||||
|
||||
this->buffer =
|
||||
std::unique_ptr<unsigned char[]>(new unsigned char[this->capacity]);
|
||||
}
|
||||
|
||||
TSQueue::~TSQueue() {}
|
||||
|
||||
bool TSQueue::push(void *data) {
|
||||
while (spinLock.exchange(true) == true) {
|
||||
}
|
||||
if (!isEmpty && head == tail) {
|
||||
spinLock.store(false);
|
||||
return false;
|
||||
}
|
||||
|
||||
memcpy(buffer.get() + tail, data, elemSize);
|
||||
tail = (tail + elemSize) % capacity;
|
||||
|
||||
isEmpty = false;
|
||||
|
||||
spinLock.store(false);
|
||||
return true;
|
||||
}
|
||||
|
||||
std::unique_ptr<unsigned char[]> TSQueue::top() {
|
||||
while (spinLock.exchange(true) == true) {
|
||||
}
|
||||
if (isEmpty) {
|
||||
spinLock.store(false);
|
||||
return std::unique_ptr<unsigned char[]>();
|
||||
}
|
||||
|
||||
auto data = std::unique_ptr<unsigned char[]>(new unsigned char[elemSize]);
|
||||
if (tail != 0) {
|
||||
memcpy(data.get(), buffer.get() + (tail - elemSize), elemSize);
|
||||
} else {
|
||||
memcpy(data.get(), buffer.get() + (capacity - elemSize), elemSize);
|
||||
}
|
||||
spinLock.store(false);
|
||||
return data;
|
||||
}
|
||||
|
||||
bool TSQueue::pop() {
|
||||
while (spinLock.exchange(true) == true) {
|
||||
}
|
||||
if (isEmpty) {
|
||||
spinLock.store(false);
|
||||
return false;
|
||||
}
|
||||
head += elemSize;
|
||||
if (head >= capacity) {
|
||||
head = 0;
|
||||
}
|
||||
if (head == tail) {
|
||||
isEmpty = true;
|
||||
}
|
||||
spinLock.store(false);
|
||||
return true;
|
||||
}
|
37
cpp_impl/src/TSQueue.hpp
Normal file
37
cpp_impl/src/TSQueue.hpp
Normal file
|
@ -0,0 +1,37 @@
|
|||
#ifndef UDPC_THREADSAFE_QUEUE_HPP
|
||||
#define UDPC_THREADSAFE_QUEUE_HPP
|
||||
|
||||
#define UDPC_TSQUEUE_DEFAULT_CAPACITY 32
|
||||
|
||||
#include <atomic>
|
||||
#include <cstdlib>
|
||||
#include <memory>
|
||||
|
||||
class TSQueue {
|
||||
public:
|
||||
TSQueue(unsigned int elemSize,
|
||||
unsigned int capacity = UDPC_TSQUEUE_DEFAULT_CAPACITY);
|
||||
~TSQueue();
|
||||
|
||||
// disable copy
|
||||
TSQueue(const TSQueue &other) = delete;
|
||||
TSQueue &operator=(const TSQueue &other) = delete;
|
||||
// disable move
|
||||
TSQueue(TSQueue &&other) = delete;
|
||||
TSQueue &operator=(TSQueue &&other) = delete;
|
||||
|
||||
bool push(void *data);
|
||||
std::unique_ptr<unsigned char[]> top();
|
||||
bool pop();
|
||||
|
||||
private:
|
||||
unsigned int elemSize;
|
||||
unsigned int capacity;
|
||||
unsigned int head;
|
||||
unsigned int tail;
|
||||
bool isEmpty;
|
||||
std::unique_ptr<unsigned char[]> buffer;
|
||||
std::atomic_bool spinLock;
|
||||
};
|
||||
|
||||
#endif
|
Loading…
Reference in a new issue