diff --git a/cpp_impl/src/UDPC_Defines.hpp b/cpp_impl/src/UDPC_Defines.hpp index 073eba9..162b56b 100644 --- a/cpp_impl/src/UDPC_Defines.hpp +++ b/cpp_impl/src/UDPC_Defines.hpp @@ -69,7 +69,7 @@ struct IPV6_Hasher { struct ConnectionData { ConnectionData(); - ConnectionData(bool isServer, Context *ctx); + ConnectionData(bool isServer, Context *ctx, struct in6_addr addr, uint16_t port); // copy ConnectionData(const ConnectionData& other) = delete; diff --git a/cpp_impl/src/UDPConnection.cpp b/cpp_impl/src/UDPConnection.cpp index e9675de..bd5991e 100644 --- a/cpp_impl/src/UDPConnection.cpp +++ b/cpp_impl/src/UDPConnection.cpp @@ -57,6 +57,8 @@ timer(std::chrono::steady_clock::duration::zero()), toggleT(UDPC::THIRTY_SECONDS), toggleTimer(std::chrono::steady_clock::duration::zero()), toggledTimer(std::chrono::steady_clock::duration::zero()), +addr({0}), +port(0), sentPkts(), sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE), priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE), @@ -66,10 +68,28 @@ sent(std::chrono::steady_clock::now()), rtt(std::chrono::steady_clock::duration::zero()) { flags.set(0); + flags.reset(1); } -UDPC::ConnectionData::ConnectionData(bool isServer, Context *ctx) : -UDPC::ConnectionData::ConnectionData() +UDPC::ConnectionData::ConnectionData(bool isServer, Context *ctx, struct in6_addr addr, uint16_t port) : +flags(), +id(0), +lseq(0), +rseq(0), +ack(0xFFFFFFFF), +timer(std::chrono::steady_clock::duration::zero()), +toggleT(UDPC::THIRTY_SECONDS), +toggleTimer(std::chrono::steady_clock::duration::zero()), +toggledTimer(std::chrono::steady_clock::duration::zero()), +addr(addr), +port(port), +sentPkts(), +sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE), +priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE), +receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE), +received(std::chrono::steady_clock::now()), +sent(std::chrono::steady_clock::now()), +rtt(std::chrono::steady_clock::duration::zero()) { flags.set(3); if(isServer) { @@ -83,7 +103,7 @@ UDPC::ConnectionData::ConnectionData() void UDPC::ConnectionData::cleanupSentPkts() { uint32_t id; while(sentPkts.size() > UDPC_SENT_PKTS_MAX_SIZE) { - id = *((uint32_t*)(sentPkts.front().data + 8)); + id = ntohl(*((uint32_t*)(sentPkts.front().data + 8))); auto iter = sentInfoMap.find(id); assert(iter != sentInfoMap.end() && "Sent packet must have correspoding entry in sentInfoMap"); @@ -111,6 +131,7 @@ mutex() } else { flags.reset(0); } + flags.set(2); rng_engine.seed(std::chrono::system_clock::now().time_since_epoch().count()); @@ -133,7 +154,7 @@ void UDPC::Context::update_impl() { UDPC_LoggingType::VERBOSE, "Timed out connection with ", UDPC_atostr((UDPC_HContext)this, iter->first), - ":", + ", port = ", iter->second.port); continue; } @@ -147,7 +168,7 @@ void UDPC::Context::update_impl() { UDPC_LoggingType::INFO, "Switching to bad mode in connection with ", UDPC_atostr((UDPC_HContext)this, iter->first), - ":", + ", port = ", iter->second.port); iter->second.flags.reset(1); if(iter->second.toggledTimer <= UDPC::TEN_SECONDS) { @@ -173,7 +194,7 @@ void UDPC::Context::update_impl() { UDPC_LoggingType::INFO, "Switching to good mode in connection with ", UDPC_atostr((UDPC_HContext)this, iter->first), - ":", + ", port = ", iter->second.port); iter->second.flags.set(1); } @@ -183,12 +204,16 @@ void UDPC::Context::update_impl() { } iter->second.timer += temp_dt_fs; - if(iter->second.timer >= (iter->second.flags.test(1) - ? UDPC::GOOD_MODE_SEND_RATE - : UDPC::BAD_MODE_SEND_RATE)) { - iter->second.timer -= (iter->second.flags.test(1) - ? UDPC::GOOD_MODE_SEND_RATE : UDPC::BAD_MODE_SEND_RATE); - iter->second.flags.set(0); + if(iter->second.flags.test(1)) { + if(iter->second.timer >= UDPC::GOOD_MODE_SEND_RATE) { + iter->second.timer -= UDPC::GOOD_MODE_SEND_RATE; + iter->second.flags.set(0); + } + } else { + if(iter->second.timer >= UDPC::BAD_MODE_SEND_RATE) { + iter->second.timer -= UDPC::BAD_MODE_SEND_RATE; + iter->second.flags.set(0); + } } } for(auto iter = removed.begin(); iter != removed.end(); ++iter) { @@ -245,6 +270,8 @@ void UDPC::Context::update_impl() { destinationInfo.sin6_family = AF_INET6; std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); destinationInfo.sin6_port = htons(iter->second.port); + destinationInfo.sin6_flowinfo = 0; + destinationInfo.sin6_scope_id = 0; long int sentBytes = sendto( socketHandle, buf.get(), @@ -257,9 +284,14 @@ void UDPC::Context::update_impl() { UDPC_LoggingType::ERROR, "Failed to send packet to initiate connection to ", UDPC_atostr((UDPC_HContext)this, iter->first), - ":", + ", port = ", iter->second.port); continue; + } else { + log(UDPC_LoggingType::INFO, "Sent initiate connection to ", + UDPC_atostr((UDPC_HContext)this, iter->first), + ", port = ", + iter->second.port); } } else { // is server, initiate connection to client @@ -280,6 +312,8 @@ void UDPC::Context::update_impl() { destinationInfo.sin6_family = AF_INET6; std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); destinationInfo.sin6_port = htons(iter->second.port); + destinationInfo.sin6_flowinfo = 0; + destinationInfo.sin6_scope_id = 0; long int sentBytes = sendto( socketHandle, buf.get(), @@ -292,7 +326,7 @@ void UDPC::Context::update_impl() { UDPC_LoggingType::ERROR, "Failed to send packet to initiate connection to ", UDPC_atostr((UDPC_HContext)this, iter->first), - ":", + ", port = ", iter->second.port); continue; } @@ -322,6 +356,8 @@ void UDPC::Context::update_impl() { destinationInfo.sin6_family = AF_INET6; std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); destinationInfo.sin6_port = htons(iter->second.port); + destinationInfo.sin6_flowinfo = 0; + destinationInfo.sin6_scope_id = 0; long int sentBytes = sendto( socketHandle, buf.get(), @@ -334,7 +370,7 @@ void UDPC::Context::update_impl() { UDPC_LoggingType::ERROR, "Failed to send heartbeat packet to ", UDPC_atostr((UDPC_HContext)this, iter->first), - ":", + ", port = ", iter->second.port); continue; } @@ -344,7 +380,7 @@ void UDPC::Context::update_impl() { pInfo.receiver.addr = iter->first.addr; pInfo.sender.port = socketInfo.sin6_port; pInfo.receiver.port = iter->second.port; - *((uint32_t*)(pInfo.data + 8)) = iter->second.lseq - 1; + *((uint32_t*)(pInfo.data + 8)) = htonl(iter->second.lseq - 1); iter->second.sentPkts.push_back(std::move(pInfo)); iter->second.cleanupSentPkts(); @@ -393,7 +429,7 @@ void UDPC::Context::update_impl() { UDPC_LoggingType::ERROR, "Failed to send packet to ", UDPC_atostr((UDPC_HContext)this, iter->first), - ":", + ", port = ", iter->second.port); continue; } @@ -456,7 +492,7 @@ void UDPC::Context::update_impl() { UDPC_LoggingType::INFO, "Received packet is smaller than header, ignoring packet from ", UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), - ":", + ", port = ", receivedData.sin6_port); return; } @@ -468,15 +504,15 @@ void UDPC::Context::update_impl() { UDPC_LoggingType::INFO, "Received packet has invalid protocol id, ignoring packet from ", UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), - ":", - receivedData.sin6_port); + ", port = ", + ntohs(receivedData.sin6_port)); return; } uint32_t conID = ntohl(*((uint32_t*)(recvBuf + 4))); uint32_t seqID = ntohl(*((uint32_t*)(recvBuf + 8))); uint32_t rseq = ntohl(*((uint32_t*)(recvBuf + 12))); - uint32_t ack = htonl(*((uint32_t*)(recvBuf + 16))); + uint32_t ack = ntohl(*((uint32_t*)(recvBuf + 16))); bool isConnect = conID & UDPC_ID_CONNECT; bool isPing = conID & UDPC_ID_PING; @@ -491,15 +527,14 @@ void UDPC::Context::update_impl() { if(!flags.test(1) && conMap.find(identifier) == conMap.end()) { // is receiving as server, connection did not already exist + UDPC::ConnectionData newConnection(true, this, receivedData.sin6_addr, ntohs(receivedData.sin6_port)); log( UDPC_LoggingType::VERBOSE, "Establishing connection with client ", UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), - ":", - receivedData.sin6_port); - UDPC::ConnectionData newConnection(true, this); - newConnection.addr = receivedData.sin6_addr; - newConnection.port = ntohs(receivedData.sin6_port); + ", port = ", + ntohs(receivedData.sin6_port), + ", giving client id = ", newConnection.id); idMap.insert(std::make_pair(newConnection.id, identifier)); conMap.insert(std::make_pair(identifier, std::move(newConnection))); @@ -529,8 +564,9 @@ void UDPC::Context::update_impl() { UDPC_LoggingType::VERBOSE, "Established connection with server ", UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), - ":", - receivedData.sin6_port); + ", port = ", + ntohs(receivedData.sin6_port), + ", got id = ", conID); // TODO trigger event client established connection with server } return; @@ -540,8 +576,7 @@ void UDPC::Context::update_impl() { if(iter == conMap.end() || iter->second.flags.test(3) || !iter->second.flags.test(4) || iter->second.id != conID) { return; - } - else if(isPing) { + } else if(isPing) { iter->second.flags.set(0); } @@ -550,8 +585,11 @@ void UDPC::Context::update_impl() { UDPC_LoggingType::INFO, "Received valid packet from ", UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), - ":", - receivedData.sin6_port); + ", port = ", + ntohs(receivedData.sin6_port), + ", packet id = ", seqID, + ", good mode = ", iter->second.flags.test(1) ? "yes" : "no", + isPing ? ", ping" : ""); // update rtt for(auto sentIter = iter->second.sentPkts.rbegin(); sentIter != iter->second.sentPkts.rend(); ++sentIter) { @@ -572,7 +610,8 @@ void UDPC::Context::update_impl() { log( UDPC_LoggingType::INFO, "RTT: ", - UDPC::durationToFSec(iter->second.rtt)); + UDPC::durationToFSec(iter->second.rtt) * 1000.0f, + " milliseconds"); break; } } @@ -688,8 +727,8 @@ void UDPC::Context::update_impl() { | (isResending ? 0x8 : 0); recPktInfo.sender.addr = receivedData.sin6_addr; recPktInfo.receiver.addr = in6addr_loopback; - recPktInfo.sender.port = receivedData.sin6_port; - recPktInfo.receiver.port = socketInfo.sin6_port; + recPktInfo.sender.port = ntohs(receivedData.sin6_port); + recPktInfo.receiver.port = ntohs(socketInfo.sin6_port); if(iter->second.receivedPkts.size() == iter->second.receivedPkts.capacity()) { log( @@ -760,7 +799,7 @@ void UDPC::preparePacket(char *data, uint32_t protocolID, uint32_t conID, } uint32_t UDPC::generateConnectionID(Context &ctx) { - auto dist = std::uniform_int_distribution(0, 0xFFFFFFFF); + auto dist = std::uniform_int_distribution(0, 0x0FFFFFFF); uint32_t id = dist(ctx.rng_engine); while(ctx.idMap.find(id) != ctx.idMap.end()) { id = dist(ctx.rng_engine); @@ -821,12 +860,16 @@ UDPC_ConnectionId UDPC_create_id_anyaddr(uint16_t port) { UDPC_HContext UDPC_init(UDPC_ConnectionId listenId, int isClient) { UDPC::Context *ctx = new UDPC::Context(false); - ctx->flags.set(1, isClient); + ctx->flags.set(1, isClient != 0); + + ctx->log(UDPC_LoggingType::INFO, "Got listen addr ", + UDPC_atostr((UDPC_HContext)ctx, listenId)); // create socket ctx->socketHandle = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP); if(ctx->socketHandle <= 0) { // TODO maybe different way of handling init fail + ctx->log(UDPC_LoggingType::ERROR, "Failed to create socket"); delete ctx; return nullptr; } @@ -841,9 +884,12 @@ UDPC_HContext UDPC_init(UDPC_ConnectionId listenId, int isClient) { ctx->socketInfo.sin6_family = AF_INET6; ctx->socketInfo.sin6_addr = listenId.addr; ctx->socketInfo.sin6_port = htons(listenId.port); + ctx->socketInfo.sin6_flowinfo = 0; + ctx->socketInfo.sin6_scope_id = 0; if(bind(ctx->socketHandle, (const struct sockaddr *)&ctx->socketInfo, sizeof(struct sockaddr_in6)) < 0) { // TODO maybe different way of handling init fail + ctx->log(UDPC_LoggingType::ERROR, "Failed to bind socket"); CleanupSocket(ctx->socketHandle); delete ctx; return nullptr; @@ -869,11 +915,14 @@ UDPC_HContext UDPC_init(UDPC_ConnectionId listenId, int isClient) { { #endif // TODO maybe different way of handling init fail + ctx->log(UDPC_LoggingType::ERROR, "Failed to set nonblocking on socket"); CleanupSocket(ctx->socketHandle); delete ctx; return nullptr; } + ctx->log(UDPC_LoggingType::INFO, "Initialized UDPC"); + return (UDPC_HContext) ctx; } @@ -886,6 +935,8 @@ UDPC_HContext UDPC_init_threaded_update(UDPC_ConnectionId listenId, ctx->flags.set(0); ctx->thread = std::thread(UDPC::threadedUpdate, ctx); + ctx->log(UDPC_LoggingType::INFO, "Initialized threaded UDPC"); + return (UDPC_HContext) ctx; } @@ -916,21 +967,30 @@ void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connec return; } + c->log(UDPC_LoggingType::INFO, "client_initiate_connection: Got peer a = ", + UDPC_atostr((UDPC_HContext)ctx, connectionId), + ", p = ", connectionId.port); + std::lock_guard lock(c->mutex); - UDPC::ConnectionData newCon(false, c); + UDPC::ConnectionData newCon(false, c, connectionId.addr, connectionId.port); - c->conMap.insert(std::make_pair(connectionId, std::move(newCon))); - auto addrConIter = c->addrConMap.find(connectionId.addr); - if(addrConIter == c->addrConMap.end()) { - auto insertResult = c->addrConMap.insert(std::make_pair( - connectionId.addr, - std::unordered_set{} - )); - assert(insertResult.second); - addrConIter = insertResult.first; + if(c->conMap.find(connectionId) == c->conMap.end()) { + c->conMap.insert(std::make_pair(connectionId, std::move(newCon))); + auto addrConIter = c->addrConMap.find(connectionId.addr); + if(addrConIter == c->addrConMap.end()) { + auto insertResult = c->addrConMap.insert(std::make_pair( + connectionId.addr, + std::unordered_set{} + )); + assert(insertResult.second); + addrConIter = insertResult.first; + } + addrConIter->second.insert(connectionId); + c->log(UDPC_LoggingType::VERBOSE, "client_initiate_connection: Initiating connection..."); + } else { + c->log(UDPC_LoggingType::ERROR, "client_initiate_connection: Already connected to peer"); } - addrConIter->second.insert(connectionId); } int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectionId) { @@ -1037,6 +1097,17 @@ int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool return 0; } +int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId) { + UDPC::Context *c = UDPC::verifyContext(ctx); + if(!c) { + return 0; + } + + std::lock_guard lock(c->mutex); + + return c->conMap.find(connectionId) == c->conMap.end() ? 0 : 1; +} + uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id) { UDPC::Context *c = UDPC::verifyContext(ctx); if(!c) { @@ -1046,7 +1117,7 @@ uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id) { return c->protocolID.exchange(id); } -UDPC_LoggingType set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType) { +UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType) { UDPC::Context *c = UDPC::verifyContext(ctx); if(!c) { return UDPC_LoggingType::SILENT; diff --git a/cpp_impl/src/UDPConnection.h b/cpp_impl/src/UDPConnection.h index ebe25b2..71717cd 100644 --- a/cpp_impl/src/UDPConnection.h +++ b/cpp_impl/src/UDPConnection.h @@ -35,7 +35,7 @@ // other defines #define UDPC_PACKET_MAX_SIZE 8192 -#define UDPC_DEFAULT_PROTOCOL_ID 1357924680 +#define UDPC_DEFAULT_PROTOCOL_ID 1357924680 // 0x50f04948 #ifdef __cplusplus #include @@ -94,9 +94,11 @@ int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting); int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool dropAllWithAddr); +int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId); + uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id); -UDPC_LoggingType set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType); +UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType); UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx); diff --git a/cpp_impl/src/test/UDPC_NetworkTest.cpp b/cpp_impl/src/test/UDPC_NetworkTest.cpp index 04b2f6f..554c1e2 100644 --- a/cpp_impl/src/test/UDPC_NetworkTest.cpp +++ b/cpp_impl/src/test/UDPC_NetworkTest.cpp @@ -1,6 +1,8 @@ #include #include #include +#include +#include #include @@ -24,6 +26,7 @@ int main(int argc, char **argv) { const char *listenPort = nullptr; const char *connectionAddr = nullptr; const char *connectionPort = nullptr; + unsigned int tickLimit = 15; while(argc > 0) { if(std::strcmp(argv[0], "-c") == 0) { isClient = true; @@ -41,6 +44,10 @@ int main(int argc, char **argv) { } else if(std::strcmp(argv[0], "-cp") == 0 && argc > 1) { --argc; ++argv; connectionPort = argv[0]; + } else if(std::strcmp(argv[0], "-t") == 0 && argc > 1) { + --argc; ++argv; + tickLimit = std::atoi(argv[0]); + printf("Set tick limit to %u\n", tickLimit); } else { printf("ERROR: invalid argument \"%s\"\n", argv[0]); usage(); @@ -50,5 +57,27 @@ int main(int argc, char **argv) { --argc; ++argv; } + UDPC_ConnectionId connectionId; + if(isClient) { + connectionId = UDPC_create_id(UDPC_strtoa(connectionAddr), std::atoi(connectionPort)); + } + auto context = UDPC_init_threaded_update(UDPC_create_id(UDPC_strtoa(listenAddr), std::atoi(listenPort)), isClient ? 1 : 0); + if(!context) { + puts("ERROR: context is NULL"); + return 1; + } + UDPC_set_logging_type(context, UDPC_LoggingType::INFO); + unsigned int tick = 0; + while(true) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + if(isClient && UDPC_has_connection(context, connectionId) == 0) { + UDPC_client_initiate_connection(context, connectionId); + } + if(tick++ > tickLimit) { + break; + } + } + UDPC_destroy(context); + return 0; }