Many fixes, including sending packets with payload

Implemented UDPC_get_received
Added UDPC_get_list_connected and UDPC_free_list_connected.
This commit is contained in:
Stephen Seo 2019-09-20 14:01:26 +09:00
parent f6ba9e21b6
commit 5a3c7cd9a0
6 changed files with 151 additions and 27 deletions

View file

@ -28,7 +28,12 @@ class TSQueue {
bool pop(); bool pop();
std::optional<T> top_and_pop(); std::optional<T> top_and_pop();
void clear(); void clear();
void changeCapacity(unsigned int newCapacity); /*
* status ==
* 0 - success
* 1 - success, but previous size was reduced
*/
void changeCapacity(unsigned int newCapacity, unsigned int *status);
unsigned int size(); unsigned int size();
unsigned int capacity(); unsigned int capacity();
bool empty(); bool empty();
@ -119,8 +124,15 @@ void TSQueue<T>::clear() {
} }
template <typename T> template <typename T>
void TSQueue<T>::changeCapacity(unsigned int newCapacity) { void TSQueue<T>::changeCapacity(unsigned int newCapacity, unsigned int *status) {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
if(status) {
if(rb.getSize() < newCapacity) {
*status = 1;
} else {
*status = 0;
}
}
rb.changeCapacity(newCapacity); rb.changeCapacity(newCapacity);
} }

View file

@ -5,7 +5,7 @@
#define UDPC_CONTEXT_IDENTIFIER 0x902F4DB3 #define UDPC_CONTEXT_IDENTIFIER 0x902F4DB3
#define UDPC_SENT_PKTS_MAX_SIZE 33 #define UDPC_SENT_PKTS_MAX_SIZE 33
#define UDPC_QUEUED_PKTS_MAX_SIZE 32 #define UDPC_QUEUED_PKTS_MAX_SIZE 32
#define UDPC_RECEIVED_PKTS_MAX_SIZE 50 #define UDPC_RECEIVED_PKTS_MAX_SIZE 64
#define UDPC_ID_CONNECT 0x80000000 #define UDPC_ID_CONNECT 0x80000000
#define UDPC_ID_PING 0x40000000 #define UDPC_ID_PING 0x40000000
@ -103,7 +103,6 @@ struct ConnectionData {
std::deque<UDPC_PacketInfo> sentPkts; std::deque<UDPC_PacketInfo> sentPkts;
TSQueue<UDPC_PacketInfo> sendPkts; TSQueue<UDPC_PacketInfo> sendPkts;
TSQueue<UDPC_PacketInfo> priorityPkts; TSQueue<UDPC_PacketInfo> priorityPkts;
TSQueue<UDPC_PacketInfo> receivedPkts;
// pkt id to pkt shared_ptr // pkt id to pkt shared_ptr
std::unordered_map<uint32_t, SentPktInfo::Ptr> sentInfoMap; std::unordered_map<uint32_t, SentPktInfo::Ptr> sentInfoMap;
std::chrono::steady_clock::time_point received; std::chrono::steady_clock::time_point received;
@ -253,6 +252,7 @@ public:
std::unordered_map<struct in6_addr, std::unordered_set<UDPC_ConnectionId, ConnectionIdHasher>, IPV6_Hasher> addrConMap; std::unordered_map<struct in6_addr, std::unordered_set<UDPC_ConnectionId, ConnectionIdHasher>, IPV6_Hasher> addrConMap;
// id to ipv6 address and port (as UDPC_ConnectionId) // id to ipv6 address and port (as UDPC_ConnectionId)
std::unordered_map<uint32_t, UDPC_ConnectionId> idMap; std::unordered_map<uint32_t, UDPC_ConnectionId> idMap;
TSQueue<UDPC_PacketInfo> receivedPkts;
std::default_random_engine rng_engine; std::default_random_engine rng_engine;

View file

@ -13,6 +13,7 @@
#include <ios> #include <ios>
#include <iomanip> #include <iomanip>
#include <regex> #include <regex>
#include <cstdlib>
#if UDPC_PLATFORM == UDPC_PLATFORM_WINDOWS #if UDPC_PLATFORM == UDPC_PLATFORM_WINDOWS
#include <netioapi.h> #include <netioapi.h>
@ -90,7 +91,6 @@ port(0),
sentPkts(), sentPkts(),
sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE), sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE), priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
received(std::chrono::steady_clock::now()), received(std::chrono::steady_clock::now()),
sent(std::chrono::steady_clock::now()), sent(std::chrono::steady_clock::now()),
rtt(std::chrono::steady_clock::duration::zero()) rtt(std::chrono::steady_clock::duration::zero())
@ -120,7 +120,6 @@ port(port),
sentPkts(), sentPkts(),
sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE), sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE), priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
received(std::chrono::steady_clock::now()), received(std::chrono::steady_clock::now()),
sent(std::chrono::steady_clock::now()), sent(std::chrono::steady_clock::now()),
rtt(std::chrono::steady_clock::duration::zero()) rtt(std::chrono::steady_clock::duration::zero())
@ -157,9 +156,14 @@ loggingType(UDPC_INFO),
loggingType(UDPC_WARNING), loggingType(UDPC_WARNING),
#endif #endif
atostrBufIndex(0), atostrBufIndex(0),
receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
rng_engine(), rng_engine(),
mutex() mutex()
{ {
for(unsigned int i = 0; i < UDPC_ATOSTR_SIZE; ++i) {
atostrBuf[i] = 0;
}
if(isThreaded) { if(isThreaded) {
flags.set(0); flags.set(0);
} else { } else {
@ -389,7 +393,10 @@ void UDPC::Context::update_impl() {
UDPC_IPV6_SOCKADDR_TYPE destinationInfo; UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6; destinationInfo.sin6_family = AF_INET6;
std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16); std::memcpy(
UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
UDPC_IPV6_ADDR_SUB(iter->first.addr),
16);
destinationInfo.sin6_port = htons(iter->second.port); destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0; destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id; destinationInfo.sin6_scope_id = iter->first.scope_id;
@ -411,6 +418,7 @@ void UDPC::Context::update_impl() {
} }
UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo(); UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
pInfo.flags = 0x4;
pInfo.sender.addr = in6addr_loopback; pInfo.sender.addr = in6addr_loopback;
pInfo.receiver.addr = iter->first.addr; pInfo.receiver.addr = iter->first.addr;
pInfo.sender.port = socketInfo.sin6_port; pInfo.sender.port = socketInfo.sin6_port;
@ -450,8 +458,13 @@ void UDPC::Context::update_impl() {
UDPC_IPV6_SOCKADDR_TYPE destinationInfo; UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6; destinationInfo.sin6_family = AF_INET6;
std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16); std::memcpy(
UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
UDPC_IPV6_ADDR_SUB(iter->first.addr),
16);
destinationInfo.sin6_port = htons(iter->second.port); destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id;
long int sentBytes = sendto( long int sentBytes = sendto(
socketHandle, socketHandle,
buf.get(), buf.get(),
@ -480,7 +493,7 @@ void UDPC::Context::update_impl() {
sentPInfo.sender.port = socketInfo.sin6_port; sentPInfo.sender.port = socketInfo.sin6_port;
sentPInfo.receiver.port = iter->second.port; sentPInfo.receiver.port = iter->second.port;
iter->second.sentPkts.push_back(std::move(pInfo)); iter->second.sentPkts.push_back(std::move(sentPInfo));
iter->second.cleanupSentPkts(); iter->second.cleanupSentPkts();
} else { } else {
// is not check-received, only id stored in data array // is not check-received, only id stored in data array
@ -491,9 +504,9 @@ void UDPC::Context::update_impl() {
sentPInfo.receiver.addr = iter->first.addr; sentPInfo.receiver.addr = iter->first.addr;
sentPInfo.sender.port = socketInfo.sin6_port; sentPInfo.sender.port = socketInfo.sin6_port;
sentPInfo.receiver.port = iter->second.port; sentPInfo.receiver.port = iter->second.port;
*((uint32_t*)(sentPInfo.data + 8)) = iter->second.lseq - 1; *((uint32_t*)(sentPInfo.data + 8)) = htonl(iter->second.lseq - 1);
iter->second.sentPkts.push_back(std::move(pInfo)); iter->second.sentPkts.push_back(std::move(sentPInfo));
iter->second.cleanupSentPkts(); iter->second.cleanupSentPkts();
} }
@ -502,6 +515,7 @@ void UDPC::Context::update_impl() {
sentPktInfo->id = iter->second.lseq - 1; sentPktInfo->id = iter->second.lseq - 1;
iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo)); iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
} }
iter->second.sent = now;
} }
// receive packet // receive packet
@ -785,14 +799,13 @@ void UDPC::Context::update_impl() {
recPktInfo.sender.port = ntohs(receivedData.sin6_port); recPktInfo.sender.port = ntohs(receivedData.sin6_port);
recPktInfo.receiver.port = ntohs(socketInfo.sin6_port); recPktInfo.receiver.port = ntohs(socketInfo.sin6_port);
if(iter->second.receivedPkts.size() == iter->second.receivedPkts.capacity()) { if(!receivedPkts.push(recPktInfo)) {
log( log(
UDPC_LoggingType::UDPC_WARNING, UDPC_LoggingType::UDPC_WARNING,
"receivedPkts is full, removing oldest entry to make room"); "receivedPkts is full, removing oldest entry to make room");
iter->second.receivedPkts.pop(); receivedPkts.pop();
receivedPkts.push(recPktInfo);
} }
iter->second.receivedPkts.push(recPktInfo);
} else if(bytes == 20) { } else if(bytes == 20) {
log( log(
UDPC_LoggingType::UDPC_VERBOSE, UDPC_LoggingType::UDPC_VERBOSE,
@ -903,7 +916,7 @@ void UDPC::threadedUpdate(Context *ctx) {
ctx->update_impl(); ctx->update_impl();
ctx->mutex.unlock(); ctx->mutex.unlock();
nextNow = std::chrono::steady_clock::now(); nextNow = std::chrono::steady_clock::now();
std::this_thread::sleep_for(std::chrono::milliseconds(33) - (nextNow - now)); std::this_thread::sleep_for(std::chrono::milliseconds(11) - (nextNow - now));
} }
} }
@ -1095,7 +1108,7 @@ int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectio
} }
void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId, void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
uint32_t isChecked, void *data, uint32_t size) { int isChecked, void *data, uint32_t size) {
if(size == 0 || !data) { if(size == 0 || !data) {
return; return;
} }
@ -1123,7 +1136,7 @@ void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
sendInfo.sender.port = c->socketInfo.sin6_port; sendInfo.sender.port = c->socketInfo.sin6_port;
sendInfo.receiver.addr = destinationId.addr; sendInfo.receiver.addr = destinationId.addr;
sendInfo.receiver.port = iter->second.port; sendInfo.receiver.port = iter->second.port;
sendInfo.flags = (isChecked ? 0x0 : 0x4); sendInfo.flags = (isChecked != 0 ? 0x0 : 0x4);
iter->second.sendPkts.push(sendInfo); iter->second.sendPkts.push(sendInfo);
} }
@ -1193,6 +1206,36 @@ int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
return c->conMap.find(connectionId) == c->conMap.end() ? 0 : 1; return c->conMap.find(connectionId) == c->conMap.end() ? 0 : 1;
} }
UDPC_ConnectionId* UDPC_get_list_connected(UDPC_HContext ctx, unsigned int *size) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return nullptr;
}
std::lock_guard<std::mutex> lock(c->mutex);
if(c->conMap.empty()) {
return nullptr;
}
if(size) {
*size = c->conMap.size();
}
UDPC_ConnectionId *list = (UDPC_ConnectionId*)std::malloc(
sizeof(UDPC_ConnectionId) * (c->conMap.size() + 1));
UDPC_ConnectionId *current = list;
for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) {
*current = iter->first;
++current;
}
*current = UDPC_ConnectionId{{0}, 0, 0};
return list;
}
void UDPC_free_list_connected(UDPC_ConnectionId *list) {
std::free(list);
}
uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id) { uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id) {
UDPC::Context *c = UDPC::verifyContext(ctx); UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) { if(!c) {
@ -1210,18 +1253,41 @@ UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggi
return static_cast<UDPC_LoggingType>(c->loggingType.exchange(loggingType)); return static_cast<UDPC_LoggingType>(c->loggingType.exchange(loggingType));
} }
UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx) { UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned int *remaining) {
UDPC::Context *c = UDPC::verifyContext(ctx); UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) { if(!c) {
return UDPC::get_empty_pinfo(); return UDPC::get_empty_pinfo();
} }
std::lock_guard<std::mutex> lock(c->mutex); auto opt_pinfo = c->receivedPkts.top_and_pop();
if(remaining) {
// TODO impl *remaining = c->receivedPkts.size();
}
if(opt_pinfo) {
return *opt_pinfo;
}
return UDPC::get_empty_pinfo(); return UDPC::get_empty_pinfo();
} }
int UDPC_set_received_capacity(UDPC_HContext ctx, unsigned int newCapacity) {
if(newCapacity == 0) {
return 0;
}
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return 0;
}
unsigned int status = 0;
c->receivedPkts.changeCapacity(newCapacity, &status);
if(status == 1) {
c->log(UDPC_LoggingType::UDPC_WARNING,
"Received Queue: Previous size was truncated to new capacity");
}
return 1;
}
const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId) { const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
return UDPC_atostr(ctx, connectionId.addr); return UDPC_atostr(ctx, connectionId.addr);
} }
@ -1255,7 +1321,8 @@ const char *UDPC_atostr(UDPC_HContext ctx, UDPC_IPV6_ADDR_TYPE addr) {
} }
} }
if(UDPC_IPV6_ADDR_SUB(addr)[i] == 0 && (headIndex - index <= 1 || c->atostrBuf[index] == ':')) { if(UDPC_IPV6_ADDR_SUB(addr)[i] == 0
&& (headIndex - index <= 1 || c->atostrBuf[index] == ':')) {
continue; continue;
} else { } else {
std::stringstream sstream; std::stringstream sstream;

View file

@ -103,7 +103,7 @@ void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connec
int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectionId); int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId, void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
uint32_t isChecked, void *data, uint32_t size); int isChecked, void *data, uint32_t size);
int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting); int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting);
@ -111,11 +111,17 @@ int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool
int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId); int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
UDPC_ConnectionId* UDPC_get_list_connected(UDPC_HContext ctx, unsigned int *size);
void UDPC_free_list_connected(UDPC_ConnectionId *list);
uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id); uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id);
UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType); UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType);
UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx); UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned int *remaining);
int UDPC_set_received_capacity(UDPC_HContext ctx, unsigned int newCapacity);
const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId); const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId);

View file

@ -62,7 +62,7 @@ TEST(TSQueue, Usage)
EXPECT_FALSE(q.push(temp)); EXPECT_FALSE(q.push(temp));
EXPECT_EQ(q.size(), 4); EXPECT_EQ(q.size(), 4);
q.changeCapacity(8); q.changeCapacity(8, nullptr);
EXPECT_EQ(q.size(), 4); EXPECT_EQ(q.size(), 4);
temp = 10; temp = 10;
@ -87,7 +87,7 @@ TEST(TSQueue, Usage)
EXPECT_EQ(400, q.top()); EXPECT_EQ(400, q.top());
q.changeCapacity(1); q.changeCapacity(1, nullptr);
// { 10 } // { 10 }

View file

@ -4,6 +4,7 @@
#include <thread> #include <thread>
#include <chrono> #include <chrono>
#include <regex> #include <regex>
#include <vector>
#include <UDPConnection.h> #include <UDPConnection.h>
@ -15,6 +16,8 @@ void usage() {
puts("-lp <port> - listen port"); puts("-lp <port> - listen port");
puts("-cl <addr> - connection addr (client only)"); puts("-cl <addr> - connection addr (client only)");
puts("-cp <port> - connection port (client only)"); puts("-cp <port> - connection port (client only)");
puts("-t <tick_count>");
puts("-n - do not add payload to packets");
} }
int main(int argc, char **argv) { int main(int argc, char **argv) {
@ -30,6 +33,7 @@ int main(int argc, char **argv) {
const char *connectionAddr = nullptr; const char *connectionAddr = nullptr;
const char *connectionPort = nullptr; const char *connectionPort = nullptr;
unsigned int tickLimit = 15; unsigned int tickLimit = 15;
bool noPayload = false;
while(argc > 0) { while(argc > 0) {
if(std::strcmp(argv[0], "-c") == 0) { if(std::strcmp(argv[0], "-c") == 0) {
isClient = true; isClient = true;
@ -51,6 +55,9 @@ int main(int argc, char **argv) {
--argc; ++argv; --argc; ++argv;
tickLimit = std::atoi(argv[0]); tickLimit = std::atoi(argv[0]);
printf("Set tick limit to %u\n", tickLimit); printf("Set tick limit to %u\n", tickLimit);
} else if(std::strcmp(argv[0], "-n") == 0) {
noPayload = true;
puts("Disabling sending payload");
} else { } else {
printf("ERROR: invalid argument \"%s\"\n", argv[0]); printf("ERROR: invalid argument \"%s\"\n", argv[0]);
usage(); usage();
@ -101,11 +108,43 @@ int main(int argc, char **argv) {
} }
UDPC_set_logging_type(context, UDPC_LoggingType::UDPC_INFO); UDPC_set_logging_type(context, UDPC_LoggingType::UDPC_INFO);
unsigned int tick = 0; unsigned int tick = 0;
unsigned int temp = 0;
unsigned int temp2, temp3;
UDPC_ConnectionId *list = nullptr;
std::vector<unsigned int> sendIds;
UDPC_PacketInfo received;
while(true) { while(true) {
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
if(isClient && UDPC_has_connection(context, connectionId) == 0) { if(isClient && UDPC_has_connection(context, connectionId) == 0) {
UDPC_client_initiate_connection(context, connectionId); UDPC_client_initiate_connection(context, connectionId);
} }
if(!noPayload) {
list = UDPC_get_list_connected(context, &temp);
if(list) {
if(sendIds.size() < temp) {
sendIds.resize(temp, 0);
} else if(sendIds.size() > temp) {
sendIds.resize(temp);
}
for(unsigned int i = 0; i < temp; ++i) {
temp2 = UDPC_get_queue_send_available(context, list[i]);
for(unsigned int j = 0; j < temp2; ++j) {
temp3 = htonl(sendIds[i]++);
UDPC_queue_send(context, list[i], 0, &temp3, sizeof(unsigned int));
}
}
UDPC_free_list_connected(list);
}
do {
received = UDPC_get_received(context, &temp);
if(received.dataSize == sizeof(unsigned int)) {
if((received.flags & 0x8) != 0) {
temp2 = ntohl(*((unsigned int*)received.data));
printf("Got out of order, data = %u\n", temp2);
}
}
} while (temp > 0);
}
if(tick++ > tickLimit) { if(tick++ > tickLimit) {
break; break;
} }