Fixes and improvements (add use of TSLQueue)

Replace "unsigned long long" in TSLQueue with "unsigned long" to keep
compatibility with C.

Add top_and_pop_and_rsize() to TSLQueue.

Fix log levels in UDPC.

Replace TSQueue with TSLQueue in UDPC_Context.
Also fix NetworkTest with TSLQueue related changes.
This commit is contained in:
Stephen Seo 2019-11-06 14:35:16 +09:00
parent 742db465dd
commit 7c889eee6a
5 changed files with 145 additions and 89 deletions

View file

@ -30,10 +30,11 @@ class TSLQueue {
bool pop();
std::optional<T> top_and_pop();
std::optional<T> top_and_pop_and_empty(bool *isEmpty);
std::optional<T> top_and_pop_and_rsize(unsigned long *rsize);
void clear();
bool empty();
unsigned long long size();
unsigned long size();
private:
struct TSLQNode {
@ -63,7 +64,7 @@ class TSLQueue {
public:
TSLQIter(std::mutex &mutex,
std::weak_ptr<TSLQNode> currentNode,
unsigned long long *msize);
unsigned long *msize);
~TSLQIter();
std::optional<T> current();
@ -74,7 +75,7 @@ class TSLQueue {
private:
std::lock_guard<std::mutex> lock;
std::weak_ptr<TSLQNode> currentNode;
unsigned long long *const msize;
unsigned long *const msize;
};
@ -85,7 +86,7 @@ class TSLQueue {
std::mutex mutex;
std::shared_ptr<TSLQNode> head;
std::shared_ptr<TSLQNode> tail;
unsigned long long msize;
unsigned long msize;
};
template <typename T>
@ -243,6 +244,31 @@ std::optional<T> TSLQueue<T>::top_and_pop_and_empty(bool *isEmpty) {
return ret;
}
template <typename T>
std::optional<T> TSLQueue<T>::top_and_pop_and_rsize(unsigned long *rsize) {
std::optional<T> ret = std::nullopt;
std::lock_guard lock(mutex);
if(head->next == tail) {
if(rsize) {
*rsize = 0;
}
} else {
assert(head->next->data);
ret = *head->next->data.get();
auto& newNext = head->next->next;
newNext->prev = head;
head->next = newNext;
assert(msize > 0);
--msize;
if(rsize) {
*rsize = msize;
}
}
return ret;
}
template <typename T>
void TSLQueue<T>::clear() {
std::lock_guard lock(mutex);
@ -259,7 +285,7 @@ bool TSLQueue<T>::empty() {
}
template <typename T>
unsigned long long TSLQueue<T>::size() {
unsigned long TSLQueue<T>::size() {
std::lock_guard lock(mutex);
return msize;
}
@ -277,7 +303,7 @@ bool TSLQueue<T>::TSLQNode::isNormal() const {
template <typename T>
TSLQueue<T>::TSLQIter::TSLQIter(std::mutex &mutex,
std::weak_ptr<TSLQNode> currentNode,
unsigned long long *msize) :
unsigned long *msize) :
lock(mutex),
currentNode(currentNode),
msize(msize)

View file

@ -33,6 +33,7 @@
#include <iostream>
#include "TSQueue.hpp"
#include "TSLQueue.hpp"
#include "UDPConnection.h"
#include <sodium.h>
@ -153,6 +154,9 @@ private:
case UDPC_LoggingType::UDPC_INFO:
std::cerr << "INFO: ";
break;
case UDPC_LoggingType::UDPC_DEBUG:
std::cerr << "DEBUG: ";
break;
default:
return;
}
@ -197,8 +201,8 @@ public:
std::unordered_map<struct in6_addr, std::unordered_set<UDPC_ConnectionId, ConnectionIdHasher>, IPV6_Hasher> addrConMap;
// id to ipv6 address and port (as UDPC_ConnectionId)
std::unordered_map<uint32_t, UDPC_ConnectionId> idMap;
TSQueue<UDPC_PacketInfo> receivedPkts;
TSQueue<UDPC_PacketInfo> cSendPkts;
TSLQueue<UDPC_PacketInfo> receivedPkts;
TSLQueue<UDPC_PacketInfo> cSendPkts;
std::default_random_engine rng_engine;

View file

@ -165,13 +165,13 @@ flags(),
isAcceptNewConnections(true),
protocolID(UDPC_DEFAULT_PROTOCOL_ID),
#ifndef NDEBUG
loggingType(UDPC_INFO),
loggingType(UDPC_DEBUG),
#else
loggingType(UDPC_WARNING),
#endif
atostrBufIndex(0),
receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
cSendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
receivedPkts(),
cSendPkts(),
rng_engine(),
mutex()
{
@ -199,12 +199,21 @@ bool UDPC::Context::willLog(UDPC_LoggingType type) {
case UDPC_LoggingType::UDPC_WARNING:
return type == UDPC_LoggingType::UDPC_ERROR
|| type == UDPC_LoggingType::UDPC_WARNING;
case UDPC_LoggingType::UDPC_INFO:
return type == UDPC_LoggingType::UDPC_ERROR
|| type == UDPC_LoggingType::UDPC_WARNING
|| type == UDPC_LoggingType::UDPC_INFO;
case UDPC_LoggingType::UDPC_VERBOSE:
return type == UDPC_LoggingType::UDPC_ERROR
|| type == UDPC_LoggingType::UDPC_WARNING
|| type == UDPC_LoggingType::UDPC_INFO
|| type == UDPC_LoggingType::UDPC_VERBOSE;
case UDPC_LoggingType::UDPC_INFO:
return type != UDPC_LoggingType::UDPC_SILENT;
case UDPC_LoggingType::UDPC_DEBUG:
return type == UDPC_LoggingType::UDPC_ERROR
|| type == UDPC_LoggingType::UDPC_WARNING
|| type == UDPC_LoggingType::UDPC_INFO
|| type == UDPC_LoggingType::UDPC_VERBOSE
|| type == UDPC_LoggingType::UDPC_DEBUG;
default:
return false;
}
@ -238,7 +247,7 @@ void UDPC::Context::update_impl() {
if(iter->second.flags.test(1) && !iter->second.flags.test(2)) {
// good mode, bad rtt
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
UDPC_LoggingType::UDPC_VERBOSE,
"Switching to bad mode in connection with ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ",
@ -264,7 +273,7 @@ void UDPC::Context::update_impl() {
iter->second.toggleTimer = std::chrono::steady_clock::duration::zero();
iter->second.toggledTimer = std::chrono::steady_clock::duration::zero();
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
UDPC_LoggingType::UDPC_VERBOSE,
"Switching to good mode in connection with ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ",
@ -315,13 +324,33 @@ void UDPC::Context::update_impl() {
// move queued in cSendPkts to existing connection's sendPkts
{
unsigned int rsize = 0;
do {
auto next = cSendPkts.top_and_pop_and_rsize(&rsize);
auto sendIter = cSendPkts.begin();
while(true) {
auto next = sendIter.current();
if(next) {
if(auto iter = conMap.find(next.value().receiver);
iter != conMap.end()) {
if(iter->second.sendPkts.size() >= UDPC_QUEUED_PKTS_MAX_SIZE) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_DEBUG,
"Not queueing packet to ",
UDPC_atostr((UDPC_HContext)this,
next.value().receiver.addr),
", port = ",
next.value().receiver.port,
", connection's queue reached max size");
if(sendIter.next()) {
continue;
} else {
break;
}
}
iter->second.sendPkts.push_back(next.value());
if(sendIter.remove()) {
continue;
} else {
break;
}
} else {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_WARNING,
@ -332,9 +361,16 @@ void UDPC::Context::update_impl() {
", port = ",
next.value().receiver.port,
" due to connection not existing");
if(sendIter.remove()) {
continue;
} else {
break;
}
}
} else {
break;
}
} while(rsize != 0);
}
}
// update send (only if triggerSend flag is set)
@ -618,7 +654,7 @@ void UDPC::Context::update_impl() {
else if(bytes < UDPC_MIN_HEADER_SIZE) {
// packet size is too small, invalid packet
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
UDPC_LoggingType::UDPC_VERBOSE,
"Received packet is smaller than header, ignoring packet from ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
", port = ",
@ -630,7 +666,7 @@ void UDPC::Context::update_impl() {
if(temp != protocolID) {
// Invalid protocol id in packet
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
UDPC_LoggingType::UDPC_VERBOSE,
"Received packet has invalid protocol id, ignoring packet from ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
", port = ",
@ -652,7 +688,7 @@ void UDPC::Context::update_impl() {
if(isConnect && bytes != UDPC_CON_HEADER_SIZE) {
// invalid packet size
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
UDPC_LoggingType::UDPC_VERBOSE,
"Got connect packet of invalid size from ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
", port = ",
@ -662,7 +698,7 @@ void UDPC::Context::update_impl() {
} else if (!isConnect && bytes < (int)UDPC_FULL_HEADER_SIZE) {
// packet is too small
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
UDPC_LoggingType::UDPC_VERBOSE,
"Got non-connect packet of invalid size from ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
", port = ",
@ -692,7 +728,7 @@ void UDPC::Context::update_impl() {
std::memcpy(newConnection.peer_pk, recvBuf + UDPC_MIN_HEADER_SIZE,
crypto_sign_PUBLICKEYBYTES);
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
UDPC_LoggingType::UDPC_INFO,
"Establishing connection with client ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
", port = ",
@ -726,7 +762,7 @@ void UDPC::Context::update_impl() {
std::memcpy(iter->second.peer_pk, recvBuf + UDPC_MIN_HEADER_SIZE,
crypto_sign_PUBLICKEYBYTES);
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
UDPC_LoggingType::UDPC_INFO,
"Established connection with server ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
", port = ",
@ -753,7 +789,7 @@ void UDPC::Context::update_impl() {
iter->second.peer_pk) != 0) {
UDPC_CHECK_LOG(
this,
UDPC_LoggingType::UDPC_VERBOSE,
UDPC_LoggingType::UDPC_INFO,
"Failed to verify received packet from",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
", port = ",
@ -764,7 +800,7 @@ void UDPC::Context::update_impl() {
// packet is valid
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
UDPC_LoggingType::UDPC_VERBOSE,
"Received valid packet from ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
", port = ",
@ -790,7 +826,7 @@ void UDPC::Context::update_impl() {
iter->second.flags.set(2, iter->second.rtt <= UDPC::GOOD_RTT_LIMIT);
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
UDPC_LoggingType::UDPC_VERBOSE,
"RTT: ",
UDPC::durationToFSec(iter->second.rtt) * 1000.0f,
" milliseconds");
@ -823,7 +859,7 @@ void UDPC::Context::update_impl() {
if(duration > UDPC::PACKET_TIMEOUT_TIME) {
if(sentIter->dataSize <= UDPC_FULL_HEADER_SIZE) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
UDPC_LoggingType::UDPC_VERBOSE,
"Timed out packet has no payload (probably "
"heartbeat packet), ignoring it");
sentIter->flags |= 0x8;
@ -858,7 +894,7 @@ void UDPC::Context::update_impl() {
if((iter->second.ack & (0x80000000 >> (diff - 1))) != 0) {
// already received packet
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
UDPC_LoggingType::UDPC_VERBOSE,
"Received packet is already marked as received, ignoring it");
return;
}
@ -872,7 +908,7 @@ void UDPC::Context::update_impl() {
if((iter->second.ack & (0x80000000 >> (diff - 1))) != 0) {
// already received packet
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
UDPC_LoggingType::UDPC_VERBOSE,
"Received packet is already marked as received, ignoring it");
return;
}
@ -887,14 +923,14 @@ void UDPC::Context::update_impl() {
} else {
// already received packet
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
UDPC_LoggingType::UDPC_VERBOSE,
"Received packet is already marked as received, ignoring it");
return;
}
if(isOutOfOrder) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
UDPC_LoggingType::UDPC_INFO,
"Received packet is out of order");
}
@ -912,13 +948,7 @@ void UDPC::Context::update_impl() {
recPktInfo.sender.port = ntohs(receivedData.sin6_port);
recPktInfo.receiver.port = ntohs(socketInfo.sin6_port);
if(!receivedPkts.push(recPktInfo)) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_WARNING,
"receivedPkts is full, removing oldest entry to make room");
receivedPkts.pop();
receivedPkts.push(recPktInfo);
}
receivedPkts.push(recPktInfo);
} else if(bytes == UDPC_FULL_HEADER_SIZE) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
@ -1228,21 +1258,12 @@ void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connec
addrConIter = insertResult.first;
}
addrConIter->second.insert(connectionId);
UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_VERBOSE, "client_initiate_connection: Initiating connection...");
UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "client_initiate_connection: Initiating connection...");
} else {
UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_ERROR, "client_initiate_connection: Already connected to peer");
}
}
int UDPC_get_queue_send_available(UDPC_HContext ctx) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return 0;
}
return c->cSendPkts.remaining_capacity();
}
void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
int isChecked, void *data, uint32_t size) {
if(size == 0 || !data) {
@ -1252,15 +1273,6 @@ void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return;
} else if(c->cSendPkts.full()) {
UDPC_CHECK_LOG(c,
UDPC_LoggingType::UDPC_ERROR,
"Failed to queue packet to ",
UDPC_atostr(ctx, destinationId.addr),
", port = ",
destinationId.port,
" because queue is full");
return;
}
UDPC_PacketInfo sendInfo = UDPC::get_empty_pinfo();
@ -1275,6 +1287,15 @@ void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
c->cSendPkts.push(sendInfo);
}
unsigned long UDPC_get_queue_send_current_size(UDPC_HContext ctx) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return 0;
}
return c->cSendPkts.size();
}
int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
@ -1386,7 +1407,7 @@ UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggi
return static_cast<UDPC_LoggingType>(c->loggingType.exchange(loggingType));
}
UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned int *remaining) {
UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned long *remaining) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return UDPC::get_empty_pinfo();
@ -1399,25 +1420,6 @@ UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned int *remaining) {
return UDPC::get_empty_pinfo();
}
int UDPC_set_received_capacity(UDPC_HContext ctx, unsigned int newCapacity) {
if(newCapacity == 0) {
return 0;
}
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return 0;
}
unsigned int status = 0;
c->receivedPkts.changeCapacity(newCapacity, &status);
if(status == 1) {
UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_WARNING,
"Received Queue: Previous size was truncated to new capacity");
}
return 1;
}
const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
return UDPC_atostr(ctx, connectionId.addr);
}

View file

@ -65,7 +65,7 @@ extern "C" {
struct UDPC_Context;
typedef struct UDPC_Context *UDPC_HContext;
typedef enum { UDPC_SILENT, UDPC_ERROR, UDPC_WARNING, UDPC_VERBOSE, UDPC_INFO } UDPC_LoggingType;
typedef enum { UDPC_SILENT, UDPC_ERROR, UDPC_WARNING, UDPC_INFO, UDPC_VERBOSE, UDPC_DEBUG } UDPC_LoggingType;
typedef struct {
UDPC_IPV6_ADDR_TYPE addr;
@ -105,11 +105,11 @@ void UDPC_update(UDPC_HContext ctx);
void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
int UDPC_get_queue_send_available(UDPC_HContext ctx);
void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
int isChecked, void *data, uint32_t size);
unsigned long UDPC_get_queue_send_current_size(UDPC_HContext ctx);
int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting);
int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool dropAllWithAddr);
@ -124,9 +124,7 @@ uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id);
UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType);
UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned int *remaining);
int UDPC_set_received_capacity(UDPC_HContext ctx, unsigned int newCapacity);
UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned long *remaining);
const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId);

View file

@ -8,6 +8,8 @@
#include <UDPConnection.h>
#define QUEUED_MAX_SIZE 32
static const std::regex ipv6_regex_linkonly = std::regex(R"d(fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,})d");
void usage() {
@ -18,6 +20,7 @@ void usage() {
puts("-cp <port> - connection port (client only)");
puts("-t <tick_count>");
puts("-n - do not add payload to packets");
puts("-l (silent|error|warning|info|verbose|debug) - log level, default debug");
}
int main(int argc, char **argv) {
@ -34,6 +37,7 @@ int main(int argc, char **argv) {
const char *connectionPort = nullptr;
unsigned int tickLimit = 15;
bool noPayload = false;
UDPC_LoggingType logLevel = UDPC_LoggingType::UDPC_DEBUG;
while(argc > 0) {
if(std::strcmp(argv[0], "-c") == 0) {
isClient = true;
@ -58,6 +62,26 @@ int main(int argc, char **argv) {
} else if(std::strcmp(argv[0], "-n") == 0) {
noPayload = true;
puts("Disabling sending payload");
} else if(std::strcmp(argv[0], "-l") == 0) {
--argc; ++argv;
if(std::strcmp(argv[0], "silent") == 0) {
logLevel = UDPC_LoggingType::UDPC_SILENT;
} else if(std::strcmp(argv[0], "error") == 0) {
logLevel = UDPC_LoggingType::UDPC_ERROR;
} else if(std::strcmp(argv[0], "warning") == 0) {
logLevel = UDPC_LoggingType::UDPC_WARNING;
} else if(std::strcmp(argv[0], "info") == 0) {
logLevel = UDPC_LoggingType::UDPC_INFO;
} else if(std::strcmp(argv[0], "verbose") == 0) {
logLevel = UDPC_LoggingType::UDPC_VERBOSE;
} else if(std::strcmp(argv[0], "debug") == 0) {
logLevel = UDPC_LoggingType::UDPC_DEBUG;
} else {
printf("ERROR: invalid argument \"%s\", expected "
"silent|error|warning|info|verbose|debug", argv[0]);
usage();
return 1;
}
} else {
printf("ERROR: invalid argument \"%s\"\n", argv[0]);
usage();
@ -106,10 +130,11 @@ int main(int argc, char **argv) {
puts("ERROR: context is NULL");
return 1;
}
UDPC_set_logging_type(context, UDPC_LoggingType::UDPC_INFO);
UDPC_set_logging_type(context, logLevel);
unsigned int tick = 0;
unsigned int temp = 0;
unsigned int temp2, temp3;
unsigned long size;
UDPC_ConnectionId *list = nullptr;
std::vector<unsigned int> sendIds;
UDPC_PacketInfo received;
@ -126,7 +151,8 @@ int main(int argc, char **argv) {
} else if(sendIds.size() > temp) {
sendIds.resize(temp);
}
temp2 = UDPC_get_queue_send_available(context);
size = UDPC_get_queue_send_current_size(context);
temp2 = size < QUEUED_MAX_SIZE ? QUEUED_MAX_SIZE - size : 0;
for(unsigned int i = 0; i < temp2; ++i) {
temp3 = htonl(sendIds[i % temp]++);
UDPC_queue_send(context, list[i % temp], 0, &temp3, sizeof(unsigned int));
@ -134,14 +160,14 @@ int main(int argc, char **argv) {
UDPC_free_list_connected(list);
}
do {
received = UDPC_get_received(context, &temp);
received = UDPC_get_received(context, &size);
if(received.dataSize == sizeof(unsigned int)) {
if((received.flags & 0x8) != 0) {
temp2 = ntohl(*((unsigned int*)received.data));
printf("Got out of order, data = %u\n", temp2);
}
}
} while (temp > 0);
} while (size > 0);
}
if(tick++ > tickLimit) {
break;