Impl NetworkTest, fixes and refactoring

Basic connections can now be tested with NetworkTest.
Also includes fixes to UDPC that now create a working connection.

TODO Fix client sending at good rate even in bad rate mode.
This commit is contained in:
Stephen Seo 2019-09-17 20:33:47 +09:00
parent a642db53f0
commit 2bc6eeffe9
4 changed files with 154 additions and 52 deletions

View file

@ -69,7 +69,7 @@ struct IPV6_Hasher {
struct ConnectionData {
ConnectionData();
ConnectionData(bool isServer, Context *ctx);
ConnectionData(bool isServer, Context *ctx, struct in6_addr addr, uint16_t port);
// copy
ConnectionData(const ConnectionData& other) = delete;

View file

@ -57,6 +57,8 @@ timer(std::chrono::steady_clock::duration::zero()),
toggleT(UDPC::THIRTY_SECONDS),
toggleTimer(std::chrono::steady_clock::duration::zero()),
toggledTimer(std::chrono::steady_clock::duration::zero()),
addr({0}),
port(0),
sentPkts(),
sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
@ -66,10 +68,28 @@ sent(std::chrono::steady_clock::now()),
rtt(std::chrono::steady_clock::duration::zero())
{
flags.set(0);
flags.reset(1);
}
UDPC::ConnectionData::ConnectionData(bool isServer, Context *ctx) :
UDPC::ConnectionData::ConnectionData()
UDPC::ConnectionData::ConnectionData(bool isServer, Context *ctx, struct in6_addr addr, uint16_t port) :
flags(),
id(0),
lseq(0),
rseq(0),
ack(0xFFFFFFFF),
timer(std::chrono::steady_clock::duration::zero()),
toggleT(UDPC::THIRTY_SECONDS),
toggleTimer(std::chrono::steady_clock::duration::zero()),
toggledTimer(std::chrono::steady_clock::duration::zero()),
addr(addr),
port(port),
sentPkts(),
sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
received(std::chrono::steady_clock::now()),
sent(std::chrono::steady_clock::now()),
rtt(std::chrono::steady_clock::duration::zero())
{
flags.set(3);
if(isServer) {
@ -83,7 +103,7 @@ UDPC::ConnectionData::ConnectionData()
void UDPC::ConnectionData::cleanupSentPkts() {
uint32_t id;
while(sentPkts.size() > UDPC_SENT_PKTS_MAX_SIZE) {
id = *((uint32_t*)(sentPkts.front().data + 8));
id = ntohl(*((uint32_t*)(sentPkts.front().data + 8)));
auto iter = sentInfoMap.find(id);
assert(iter != sentInfoMap.end()
&& "Sent packet must have correspoding entry in sentInfoMap");
@ -111,6 +131,7 @@ mutex()
} else {
flags.reset(0);
}
flags.set(2);
rng_engine.seed(std::chrono::system_clock::now().time_since_epoch().count());
@ -133,7 +154,7 @@ void UDPC::Context::update_impl() {
UDPC_LoggingType::VERBOSE,
"Timed out connection with ",
UDPC_atostr((UDPC_HContext)this, iter->first),
":",
", port = ",
iter->second.port);
continue;
}
@ -147,7 +168,7 @@ void UDPC::Context::update_impl() {
UDPC_LoggingType::INFO,
"Switching to bad mode in connection with ",
UDPC_atostr((UDPC_HContext)this, iter->first),
":",
", port = ",
iter->second.port);
iter->second.flags.reset(1);
if(iter->second.toggledTimer <= UDPC::TEN_SECONDS) {
@ -173,7 +194,7 @@ void UDPC::Context::update_impl() {
UDPC_LoggingType::INFO,
"Switching to good mode in connection with ",
UDPC_atostr((UDPC_HContext)this, iter->first),
":",
", port = ",
iter->second.port);
iter->second.flags.set(1);
}
@ -183,12 +204,16 @@ void UDPC::Context::update_impl() {
}
iter->second.timer += temp_dt_fs;
if(iter->second.timer >= (iter->second.flags.test(1)
? UDPC::GOOD_MODE_SEND_RATE
: UDPC::BAD_MODE_SEND_RATE)) {
iter->second.timer -= (iter->second.flags.test(1)
? UDPC::GOOD_MODE_SEND_RATE : UDPC::BAD_MODE_SEND_RATE);
iter->second.flags.set(0);
if(iter->second.flags.test(1)) {
if(iter->second.timer >= UDPC::GOOD_MODE_SEND_RATE) {
iter->second.timer -= UDPC::GOOD_MODE_SEND_RATE;
iter->second.flags.set(0);
}
} else {
if(iter->second.timer >= UDPC::BAD_MODE_SEND_RATE) {
iter->second.timer -= UDPC::BAD_MODE_SEND_RATE;
iter->second.flags.set(0);
}
}
}
for(auto iter = removed.begin(); iter != removed.end(); ++iter) {
@ -245,6 +270,8 @@ void UDPC::Context::update_impl() {
destinationInfo.sin6_family = AF_INET6;
std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = 0;
long int sentBytes = sendto(
socketHandle,
buf.get(),
@ -257,9 +284,14 @@ void UDPC::Context::update_impl() {
UDPC_LoggingType::ERROR,
"Failed to send packet to initiate connection to ",
UDPC_atostr((UDPC_HContext)this, iter->first),
":",
", port = ",
iter->second.port);
continue;
} else {
log(UDPC_LoggingType::INFO, "Sent initiate connection to ",
UDPC_atostr((UDPC_HContext)this, iter->first),
", port = ",
iter->second.port);
}
} else {
// is server, initiate connection to client
@ -280,6 +312,8 @@ void UDPC::Context::update_impl() {
destinationInfo.sin6_family = AF_INET6;
std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = 0;
long int sentBytes = sendto(
socketHandle,
buf.get(),
@ -292,7 +326,7 @@ void UDPC::Context::update_impl() {
UDPC_LoggingType::ERROR,
"Failed to send packet to initiate connection to ",
UDPC_atostr((UDPC_HContext)this, iter->first),
":",
", port = ",
iter->second.port);
continue;
}
@ -322,6 +356,8 @@ void UDPC::Context::update_impl() {
destinationInfo.sin6_family = AF_INET6;
std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = 0;
long int sentBytes = sendto(
socketHandle,
buf.get(),
@ -334,7 +370,7 @@ void UDPC::Context::update_impl() {
UDPC_LoggingType::ERROR,
"Failed to send heartbeat packet to ",
UDPC_atostr((UDPC_HContext)this, iter->first),
":",
", port = ",
iter->second.port);
continue;
}
@ -344,7 +380,7 @@ void UDPC::Context::update_impl() {
pInfo.receiver.addr = iter->first.addr;
pInfo.sender.port = socketInfo.sin6_port;
pInfo.receiver.port = iter->second.port;
*((uint32_t*)(pInfo.data + 8)) = iter->second.lseq - 1;
*((uint32_t*)(pInfo.data + 8)) = htonl(iter->second.lseq - 1);
iter->second.sentPkts.push_back(std::move(pInfo));
iter->second.cleanupSentPkts();
@ -393,7 +429,7 @@ void UDPC::Context::update_impl() {
UDPC_LoggingType::ERROR,
"Failed to send packet to ",
UDPC_atostr((UDPC_HContext)this, iter->first),
":",
", port = ",
iter->second.port);
continue;
}
@ -456,7 +492,7 @@ void UDPC::Context::update_impl() {
UDPC_LoggingType::INFO,
"Received packet is smaller than header, ignoring packet from ",
UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}),
":",
", port = ",
receivedData.sin6_port);
return;
}
@ -468,15 +504,15 @@ void UDPC::Context::update_impl() {
UDPC_LoggingType::INFO,
"Received packet has invalid protocol id, ignoring packet from ",
UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}),
":",
receivedData.sin6_port);
", port = ",
ntohs(receivedData.sin6_port));
return;
}
uint32_t conID = ntohl(*((uint32_t*)(recvBuf + 4)));
uint32_t seqID = ntohl(*((uint32_t*)(recvBuf + 8)));
uint32_t rseq = ntohl(*((uint32_t*)(recvBuf + 12)));
uint32_t ack = htonl(*((uint32_t*)(recvBuf + 16)));
uint32_t ack = ntohl(*((uint32_t*)(recvBuf + 16)));
bool isConnect = conID & UDPC_ID_CONNECT;
bool isPing = conID & UDPC_ID_PING;
@ -491,15 +527,14 @@ void UDPC::Context::update_impl() {
if(!flags.test(1)
&& conMap.find(identifier) == conMap.end()) {
// is receiving as server, connection did not already exist
UDPC::ConnectionData newConnection(true, this, receivedData.sin6_addr, ntohs(receivedData.sin6_port));
log(
UDPC_LoggingType::VERBOSE,
"Establishing connection with client ",
UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}),
":",
receivedData.sin6_port);
UDPC::ConnectionData newConnection(true, this);
newConnection.addr = receivedData.sin6_addr;
newConnection.port = ntohs(receivedData.sin6_port);
", port = ",
ntohs(receivedData.sin6_port),
", giving client id = ", newConnection.id);
idMap.insert(std::make_pair(newConnection.id, identifier));
conMap.insert(std::make_pair(identifier, std::move(newConnection)));
@ -529,8 +564,9 @@ void UDPC::Context::update_impl() {
UDPC_LoggingType::VERBOSE,
"Established connection with server ",
UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}),
":",
receivedData.sin6_port);
", port = ",
ntohs(receivedData.sin6_port),
", got id = ", conID);
// TODO trigger event client established connection with server
}
return;
@ -540,8 +576,7 @@ void UDPC::Context::update_impl() {
if(iter == conMap.end() || iter->second.flags.test(3)
|| !iter->second.flags.test(4) || iter->second.id != conID) {
return;
}
else if(isPing) {
} else if(isPing) {
iter->second.flags.set(0);
}
@ -550,8 +585,11 @@ void UDPC::Context::update_impl() {
UDPC_LoggingType::INFO,
"Received valid packet from ",
UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}),
":",
receivedData.sin6_port);
", port = ",
ntohs(receivedData.sin6_port),
", packet id = ", seqID,
", good mode = ", iter->second.flags.test(1) ? "yes" : "no",
isPing ? ", ping" : "");
// update rtt
for(auto sentIter = iter->second.sentPkts.rbegin(); sentIter != iter->second.sentPkts.rend(); ++sentIter) {
@ -572,7 +610,8 @@ void UDPC::Context::update_impl() {
log(
UDPC_LoggingType::INFO,
"RTT: ",
UDPC::durationToFSec(iter->second.rtt));
UDPC::durationToFSec(iter->second.rtt) * 1000.0f,
" milliseconds");
break;
}
}
@ -688,8 +727,8 @@ void UDPC::Context::update_impl() {
| (isResending ? 0x8 : 0);
recPktInfo.sender.addr = receivedData.sin6_addr;
recPktInfo.receiver.addr = in6addr_loopback;
recPktInfo.sender.port = receivedData.sin6_port;
recPktInfo.receiver.port = socketInfo.sin6_port;
recPktInfo.sender.port = ntohs(receivedData.sin6_port);
recPktInfo.receiver.port = ntohs(socketInfo.sin6_port);
if(iter->second.receivedPkts.size() == iter->second.receivedPkts.capacity()) {
log(
@ -760,7 +799,7 @@ void UDPC::preparePacket(char *data, uint32_t protocolID, uint32_t conID,
}
uint32_t UDPC::generateConnectionID(Context &ctx) {
auto dist = std::uniform_int_distribution<uint32_t>(0, 0xFFFFFFFF);
auto dist = std::uniform_int_distribution<uint32_t>(0, 0x0FFFFFFF);
uint32_t id = dist(ctx.rng_engine);
while(ctx.idMap.find(id) != ctx.idMap.end()) {
id = dist(ctx.rng_engine);
@ -821,12 +860,16 @@ UDPC_ConnectionId UDPC_create_id_anyaddr(uint16_t port) {
UDPC_HContext UDPC_init(UDPC_ConnectionId listenId, int isClient) {
UDPC::Context *ctx = new UDPC::Context(false);
ctx->flags.set(1, isClient);
ctx->flags.set(1, isClient != 0);
ctx->log(UDPC_LoggingType::INFO, "Got listen addr ",
UDPC_atostr((UDPC_HContext)ctx, listenId));
// create socket
ctx->socketHandle = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
if(ctx->socketHandle <= 0) {
// TODO maybe different way of handling init fail
ctx->log(UDPC_LoggingType::ERROR, "Failed to create socket");
delete ctx;
return nullptr;
}
@ -841,9 +884,12 @@ UDPC_HContext UDPC_init(UDPC_ConnectionId listenId, int isClient) {
ctx->socketInfo.sin6_family = AF_INET6;
ctx->socketInfo.sin6_addr = listenId.addr;
ctx->socketInfo.sin6_port = htons(listenId.port);
ctx->socketInfo.sin6_flowinfo = 0;
ctx->socketInfo.sin6_scope_id = 0;
if(bind(ctx->socketHandle, (const struct sockaddr *)&ctx->socketInfo,
sizeof(struct sockaddr_in6)) < 0) {
// TODO maybe different way of handling init fail
ctx->log(UDPC_LoggingType::ERROR, "Failed to bind socket");
CleanupSocket(ctx->socketHandle);
delete ctx;
return nullptr;
@ -869,11 +915,14 @@ UDPC_HContext UDPC_init(UDPC_ConnectionId listenId, int isClient) {
{
#endif
// TODO maybe different way of handling init fail
ctx->log(UDPC_LoggingType::ERROR, "Failed to set nonblocking on socket");
CleanupSocket(ctx->socketHandle);
delete ctx;
return nullptr;
}
ctx->log(UDPC_LoggingType::INFO, "Initialized UDPC");
return (UDPC_HContext) ctx;
}
@ -886,6 +935,8 @@ UDPC_HContext UDPC_init_threaded_update(UDPC_ConnectionId listenId,
ctx->flags.set(0);
ctx->thread = std::thread(UDPC::threadedUpdate, ctx);
ctx->log(UDPC_LoggingType::INFO, "Initialized threaded UDPC");
return (UDPC_HContext) ctx;
}
@ -916,21 +967,30 @@ void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connec
return;
}
c->log(UDPC_LoggingType::INFO, "client_initiate_connection: Got peer a = ",
UDPC_atostr((UDPC_HContext)ctx, connectionId),
", p = ", connectionId.port);
std::lock_guard<std::mutex> lock(c->mutex);
UDPC::ConnectionData newCon(false, c);
UDPC::ConnectionData newCon(false, c, connectionId.addr, connectionId.port);
c->conMap.insert(std::make_pair(connectionId, std::move(newCon)));
auto addrConIter = c->addrConMap.find(connectionId.addr);
if(addrConIter == c->addrConMap.end()) {
auto insertResult = c->addrConMap.insert(std::make_pair(
connectionId.addr,
std::unordered_set<UDPC_ConnectionId, UDPC::ConnectionIdHasher>{}
));
assert(insertResult.second);
addrConIter = insertResult.first;
if(c->conMap.find(connectionId) == c->conMap.end()) {
c->conMap.insert(std::make_pair(connectionId, std::move(newCon)));
auto addrConIter = c->addrConMap.find(connectionId.addr);
if(addrConIter == c->addrConMap.end()) {
auto insertResult = c->addrConMap.insert(std::make_pair(
connectionId.addr,
std::unordered_set<UDPC_ConnectionId, UDPC::ConnectionIdHasher>{}
));
assert(insertResult.second);
addrConIter = insertResult.first;
}
addrConIter->second.insert(connectionId);
c->log(UDPC_LoggingType::VERBOSE, "client_initiate_connection: Initiating connection...");
} else {
c->log(UDPC_LoggingType::ERROR, "client_initiate_connection: Already connected to peer");
}
addrConIter->second.insert(connectionId);
}
int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
@ -1037,6 +1097,17 @@ int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool
return 0;
}
int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return 0;
}
std::lock_guard<std::mutex> lock(c->mutex);
return c->conMap.find(connectionId) == c->conMap.end() ? 0 : 1;
}
uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
@ -1046,7 +1117,7 @@ uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id) {
return c->protocolID.exchange(id);
}
UDPC_LoggingType set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType) {
UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return UDPC_LoggingType::SILENT;

View file

@ -35,7 +35,7 @@
// other defines
#define UDPC_PACKET_MAX_SIZE 8192
#define UDPC_DEFAULT_PROTOCOL_ID 1357924680
#define UDPC_DEFAULT_PROTOCOL_ID 1357924680 // 0x50f04948
#ifdef __cplusplus
#include <cstdint>
@ -94,9 +94,11 @@ int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting);
int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool dropAllWithAddr);
int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id);
UDPC_LoggingType set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType);
UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType);
UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx);

View file

@ -1,6 +1,8 @@
#include <cstring>
#include <string>
#include <cstdio>
#include <thread>
#include <chrono>
#include <UDPConnection.h>
@ -24,6 +26,7 @@ int main(int argc, char **argv) {
const char *listenPort = nullptr;
const char *connectionAddr = nullptr;
const char *connectionPort = nullptr;
unsigned int tickLimit = 15;
while(argc > 0) {
if(std::strcmp(argv[0], "-c") == 0) {
isClient = true;
@ -41,6 +44,10 @@ int main(int argc, char **argv) {
} else if(std::strcmp(argv[0], "-cp") == 0 && argc > 1) {
--argc; ++argv;
connectionPort = argv[0];
} else if(std::strcmp(argv[0], "-t") == 0 && argc > 1) {
--argc; ++argv;
tickLimit = std::atoi(argv[0]);
printf("Set tick limit to %u\n", tickLimit);
} else {
printf("ERROR: invalid argument \"%s\"\n", argv[0]);
usage();
@ -50,5 +57,27 @@ int main(int argc, char **argv) {
--argc; ++argv;
}
UDPC_ConnectionId connectionId;
if(isClient) {
connectionId = UDPC_create_id(UDPC_strtoa(connectionAddr), std::atoi(connectionPort));
}
auto context = UDPC_init_threaded_update(UDPC_create_id(UDPC_strtoa(listenAddr), std::atoi(listenPort)), isClient ? 1 : 0);
if(!context) {
puts("ERROR: context is NULL");
return 1;
}
UDPC_set_logging_type(context, UDPC_LoggingType::INFO);
unsigned int tick = 0;
while(true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
if(isClient && UDPC_has_connection(context, connectionId) == 0) {
UDPC_client_initiate_connection(context, connectionId);
}
if(tick++ > tickLimit) {
break;
}
}
UDPC_destroy(context);
return 0;
}