From e320a6343eb74accf0073646e8d6dde4803e7e85 Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Mon, 16 Sep 2019 11:40:43 +0900 Subject: [PATCH] Impl threaded init/update --- cpp_impl/src/UDPC_Defines.hpp | 7 + cpp_impl/src/UDPConnection.cpp | 542 +++++++++++++++++---------------- 2 files changed, 289 insertions(+), 260 deletions(-) diff --git a/cpp_impl/src/UDPC_Defines.hpp b/cpp_impl/src/UDPC_Defines.hpp index 48f1af9..d03d09a 100644 --- a/cpp_impl/src/UDPC_Defines.hpp +++ b/cpp_impl/src/UDPC_Defines.hpp @@ -224,6 +224,8 @@ private: } // }}} public: + void update_impl(); + uint_fast32_t _contextIdentifier; char recvBuf[UDPC_PACKET_MAX_SIZE]; @@ -252,6 +254,9 @@ public: std::default_random_engine rng_engine; + std::thread thread; + std::atomic_bool threadRunning; + }; // struct Context Context *verifyContext(UDPC_HContext ctx); @@ -278,6 +283,8 @@ float timePointsToFSec( UDPC_PacketInfo get_empty_pinfo(); +void threadedUpdate(Context *ctx); + } // namespace UDPC bool operator ==(const UDPC_ConnectionId& a, const UDPC_ConnectionId& b); diff --git a/cpp_impl/src/UDPConnection.cpp b/cpp_impl/src/UDPConnection.cpp index 0a2f775..1b9106e 100644 --- a/cpp_impl/src/UDPConnection.cpp +++ b/cpp_impl/src/UDPConnection.cpp @@ -106,206 +106,26 @@ rng_engine() } rng_engine.seed(std::chrono::system_clock::now().time_since_epoch().count()); -} - -UDPC::Context *UDPC::verifyContext(UDPC_HContext ctx) { - if(ctx == nullptr) { - return nullptr; - } - UDPC::Context *c = (UDPC::Context *)ctx; - if(c->_contextIdentifier == UDPC_CONTEXT_IDENTIFIER) { - return c; - } else { - return nullptr; - } -} - -bool UDPC::isBigEndian() { - static std::optional isBigEndian = std::nullopt; - if(isBigEndian) { - return *isBigEndian; - } - union { - uint32_t i; - char c[4]; - } bint = {0x01020304}; - - isBigEndian = (bint.c[0] == 1); - return *isBigEndian; -} - -void UDPC::preparePacket(char *data, uint32_t protocolID, uint32_t conID, - uint32_t rseq, uint32_t ack, uint32_t *seqID, - int flags) { - uint32_t temp; - - temp = htonl(protocolID); - std::memcpy(data, &temp, 4); - temp = htonl(conID | ((flags & 0x1) != 0 ? UDPC_ID_CONNECT : 0) | - ((flags & 0x2) != 0 ? UDPC_ID_PING : 0) | - ((flags & 0x4) != 0 ? UDPC_ID_NO_REC_CHK : 0) | - ((flags & 0x8) != 0 ? UDPC_ID_RESENDING : 0)); - std::memcpy(data + 4, &temp, 4); - - if(seqID) { - temp = htonl(*seqID); - ++(*seqID); - } else { - temp = 0; - } - std::memcpy(data + 8, &temp, 4); - - temp = htonl(rseq); - std::memcpy(data + 12, &temp, 4); - temp = htonl(ack); - std::memcpy(data + 16, &temp, 4); -} - -uint32_t UDPC::generateConnectionID(Context &ctx) { - auto dist = std::uniform_int_distribution(0, 0xFFFFFFFF); - uint32_t id = dist(ctx.rng_engine); - while(ctx.idMap.find(id) != ctx.idMap.end()) { - id = dist(ctx.rng_engine); - } - return id; -} - -float UDPC::durationToFSec(const std::chrono::steady_clock::duration& duration) { - return (float)duration.count() - * (float)std::decay_t::period::num - / (float)std::decay_t::period::den; -} - -float UDPC::timePointsToFSec( - const std::chrono::steady_clock::time_point& older, - const std::chrono::steady_clock::time_point& newer) { - const auto dt = newer - older; - return (float)dt.count() - * (float)decltype(dt)::period::num / (float)decltype(dt)::period::den; -} - -UDPC_PacketInfo UDPC::get_empty_pinfo() { - return UDPC_PacketInfo { - {0}, // data (array) - 0, // flags - 0, // dataSize - { // sender - {0}, // ipv6 addr - 0 // port - }, - { // receiver - {0}, // ipv6 addr - 0 // port - }, - }; -} - -UDPC_ConnectionId UDPC_create_id(struct in6_addr addr, uint16_t port) { - return UDPC_ConnectionId{addr, port}; -} - -UDPC_ConnectionId UDPC_create_id_anyaddr(uint16_t port) { - return UDPC_ConnectionId{in6addr_any, port}; -} - -UDPC_HContext UDPC_init(UDPC_ConnectionId listenId, int isClient) { - UDPC::Context *ctx = new UDPC::Context(false); - ctx->flags.set(1, isClient); - - // create socket - ctx->socketHandle = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP); - if(ctx->socketHandle <= 0) { - // TODO maybe different way of handling init fail - delete ctx; - return nullptr; - } - - // allow ipv4 connections on ipv6 socket - { - int no = 0; - setsockopt(ctx->socketHandle, IPPROTO_IPV6, IPV6_V6ONLY, &no, sizeof(no)); - } - - // bind socket - ctx->socketInfo.sin6_family = AF_INET6; - ctx->socketInfo.sin6_addr = listenId.addr; - ctx->socketInfo.sin6_port = htons(listenId.port); - if(bind(ctx->socketHandle, (const struct sockaddr *)&ctx->socketInfo, - sizeof(struct sockaddr_in6)) < 0) { - // TODO maybe different way of handling init fail - CleanupSocket(ctx->socketHandle); - delete ctx; - return nullptr; - } - - // TODO verify this is necessary to get the listen port - if(ctx->socketInfo.sin6_port == 0) { - struct sockaddr_in6 getInfo; - socklen_t size = sizeof(struct sockaddr_in6); - if(getsockname(ctx->socketHandle, (struct sockaddr *)&getInfo, &size) == 0) { - ctx->socketInfo.sin6_port = getInfo.sin6_port; - } - } - - // set non-blocking on socket -#if UDPC_PLATFORM == UDPC_PLATFORM_MAC || UDPC_PLATFORM == UDPC_PLATFORM_LINUX - int nonblocking = 1; - if(fcntl(ctx->socketHandle, F_SETFL, O_NONBLOCK, nonblocking) == -1) { -#elif UDPC_PLATFORM == UDPC_PLATFORM_WINDOWS - DWORD nonblocking = 1; - if(ioctlsocket(ctx->socketHandle, FIONBIO, &nonblocking) != 0) { -#else - { -#endif - // TODO maybe different way of handling init fail - CleanupSocket(ctx->socketHandle); - delete ctx; - return nullptr; - } - return (UDPC_HContext) ctx; + threadRunning.store(true); } -UDPC_HContext UDPC_init_threaded_update(UDPC_ConnectionId listenId, - int isClient) { - UDPC::Context *ctx = (UDPC::Context *)UDPC_init(listenId, isClient); - if(!ctx) { - return nullptr; - } - ctx->flags.set(0); - - return (UDPC_HContext) ctx; -} - -void UDPC_destroy(UDPC_HContext ctx) { - UDPC::Context *UDPC_ctx = UDPC::verifyContext(ctx); - if(UDPC_ctx) { - delete UDPC_ctx; - } -} - -void UDPC_update(UDPC_HContext ctx) { - UDPC::Context *c = UDPC::verifyContext(ctx); - if(!c || c->flags.test(0)) { - // invalid or is threaded, update should not be called - return; - } - +void UDPC::Context::update_impl() { const auto now = std::chrono::steady_clock::now(); - c->lastUpdated = now; + lastUpdated = now; std::chrono::steady_clock::duration temp_dt_fs; { // check timed out, check good/bad mode with rtt, remove timed out std::vector removed; - for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) { + for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) { temp_dt_fs = now - iter->second.received; if(temp_dt_fs >= UDPC::CONNECTION_TIMEOUT) { removed.push_back(iter->first); - c->log( + log( UDPC_LoggingType::VERBOSE, "Timed out connection with ", - UDPC_atostr(ctx, iter->first), + UDPC_atostr((UDPC_HContext)this, iter->first), ":", iter->second.port); continue; @@ -316,10 +136,10 @@ void UDPC_update(UDPC_HContext ctx) { iter->second.toggledTimer += temp_dt_fs; if(iter->second.flags.test(1) && !iter->second.flags.test(2)) { // good mode, bad rtt - c->log( + log( UDPC_LoggingType::INFO, "Switching to bad mode in connection with ", - UDPC_atostr(ctx, iter->first), + UDPC_atostr((UDPC_HContext)this, iter->first), ":", iter->second.port); iter->second.flags.reset(1); @@ -342,10 +162,10 @@ void UDPC_update(UDPC_HContext ctx) { if(iter->second.toggledTimer >= iter->second.toggleT) { iter->second.toggleTimer = std::chrono::steady_clock::duration::zero(); iter->second.toggledTimer = std::chrono::steady_clock::duration::zero(); - c->log( + log( UDPC_LoggingType::INFO, "Switching to good mode in connection with ", - UDPC_atostr(ctx, iter->first), + UDPC_atostr((UDPC_HContext)this, iter->first), ":", iter->second.port); iter->second.flags.set(1); @@ -365,38 +185,38 @@ void UDPC_update(UDPC_HContext ctx) { } } for(auto iter = removed.begin(); iter != removed.end(); ++iter) { - auto addrConIter = c->addrConMap.find(iter->addr); - assert(addrConIter != c->addrConMap.end() + auto addrConIter = addrConMap.find(iter->addr); + assert(addrConIter != addrConMap.end() && "addrConMap must have an entry for a current connection"); auto addrConSetIter = addrConIter->second.find(*iter); assert(addrConSetIter != addrConIter->second.end() && "nested set in addrConMap must have an entry for a current connection"); addrConIter->second.erase(addrConSetIter); if(addrConIter->second.empty()) { - c->addrConMap.erase(addrConIter); + addrConMap.erase(addrConIter); } - auto cIter = c->conMap.find(*iter); - assert(cIter != c->conMap.end() + auto cIter = conMap.find(*iter); + assert(cIter != conMap.end() && "conMap must have the entry set to be removed"); if(cIter->second.flags.test(4)) { - c->idMap.erase(cIter->second.id); + idMap.erase(cIter->second.id); } - c->conMap.erase(cIter); + conMap.erase(cIter); } } // update send (only if triggerSend flag is set) - for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) { + for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) { if(!iter->second.flags.test(0)) { continue; } iter->second.flags.reset(0); if(iter->second.flags.test(3)) { - if(c->flags.test(1)) { + if(flags.test(1)) { // is initiating connection to server auto initDT = now - iter->second.sent; if(initDT < UDPC::INIT_PKT_INTERVAL_DT) { @@ -407,7 +227,7 @@ void UDPC_update(UDPC_HContext ctx) { std::unique_ptr buf = std::make_unique(20); UDPC::preparePacket( buf.get(), - c->protocolID, + protocolID, 0, 0, 0xFFFFFFFF, @@ -419,17 +239,17 @@ void UDPC_update(UDPC_HContext ctx) { std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); destinationInfo.sin6_port = htons(iter->second.port); long int sentBytes = sendto( - c->socketHandle, + socketHandle, buf.get(), 20, 0, (struct sockaddr*) &destinationInfo, sizeof(struct sockaddr_in6)); if(sentBytes != 20) { - c->log( + log( UDPC_LoggingType::ERROR, "Failed to send packet to initiate connection to ", - UDPC_atostr(ctx, iter->first), + UDPC_atostr((UDPC_HContext)this, iter->first), ":", iter->second.port); continue; @@ -442,7 +262,7 @@ void UDPC_update(UDPC_HContext ctx) { std::unique_ptr buf = std::make_unique(20); UDPC::preparePacket( buf.get(), - c->protocolID, + protocolID, iter->second.id, iter->second.rseq, iter->second.ack, @@ -454,17 +274,17 @@ void UDPC_update(UDPC_HContext ctx) { std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); destinationInfo.sin6_port = htons(iter->second.port); long int sentBytes = sendto( - c->socketHandle, + socketHandle, buf.get(), 20, 0, (struct sockaddr*) &destinationInfo, sizeof(struct sockaddr_in6)); if(sentBytes != 20) { - c->log( + log( UDPC_LoggingType::ERROR, "Failed to send packet to initiate connection to ", - UDPC_atostr(ctx, iter->first), + UDPC_atostr((UDPC_HContext)this, iter->first), ":", iter->second.port); continue; @@ -484,7 +304,7 @@ void UDPC_update(UDPC_HContext ctx) { std::unique_ptr buf = std::make_unique(20); UDPC::preparePacket( buf.get(), - c->protocolID, + protocolID, iter->second.id, iter->second.rseq, iter->second.ack, @@ -496,17 +316,17 @@ void UDPC_update(UDPC_HContext ctx) { std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); destinationInfo.sin6_port = htons(iter->second.port); long int sentBytes = sendto( - c->socketHandle, + socketHandle, buf.get(), 20, 0, (struct sockaddr*) &destinationInfo, sizeof(struct sockaddr_in6)); if(sentBytes != 20) { - c->log( + log( UDPC_LoggingType::ERROR, "Failed to send heartbeat packet to ", - UDPC_atostr(ctx, iter->first), + UDPC_atostr((UDPC_HContext)this, iter->first), ":", iter->second.port); continue; @@ -515,7 +335,7 @@ void UDPC_update(UDPC_HContext ctx) { UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo(); pInfo.sender.addr = in6addr_loopback; pInfo.receiver.addr = iter->first.addr; - pInfo.sender.port = c->socketInfo.sin6_port; + pInfo.sender.port = socketInfo.sin6_port; pInfo.receiver.port = iter->second.port; *((uint32_t*)(pInfo.data + 8)) = iter->second.lseq - 1; @@ -542,7 +362,7 @@ void UDPC_update(UDPC_HContext ctx) { std::unique_ptr buf = std::make_unique(20 + pInfo.dataSize); UDPC::preparePacket( buf.get(), - c->protocolID, + protocolID, iter->second.id, iter->second.rseq, iter->second.ack, @@ -555,17 +375,17 @@ void UDPC_update(UDPC_HContext ctx) { std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); destinationInfo.sin6_port = htons(iter->second.port); long int sentBytes = sendto( - c->socketHandle, + socketHandle, buf.get(), pInfo.dataSize + 20, 0, (struct sockaddr*) &destinationInfo, sizeof(struct sockaddr_in6)); if(sentBytes != 20 + pInfo.dataSize) { - c->log( + log( UDPC_LoggingType::ERROR, "Failed to send packet to ", - UDPC_atostr(ctx, iter->first), + UDPC_atostr((UDPC_HContext)this, iter->first), ":", iter->second.port); continue; @@ -579,7 +399,7 @@ void UDPC_update(UDPC_HContext ctx) { sentPInfo.dataSize = 20 + pInfo.dataSize; sentPInfo.sender.addr = in6addr_loopback; sentPInfo.receiver.addr = iter->first.addr; - sentPInfo.sender.port = c->socketInfo.sin6_port; + sentPInfo.sender.port = socketInfo.sin6_port; sentPInfo.receiver.port = iter->second.port; iter->second.sentPkts.push_back(std::move(pInfo)); @@ -591,7 +411,7 @@ void UDPC_update(UDPC_HContext ctx) { sentPInfo.dataSize = 0; sentPInfo.sender.addr = in6addr_loopback; sentPInfo.receiver.addr = iter->first.addr; - sentPInfo.sender.port = c->socketInfo.sin6_port; + sentPInfo.sender.port = socketInfo.sin6_port; sentPInfo.receiver.port = iter->second.port; *((uint32_t*)(sentPInfo.data + 8)) = iter->second.lseq - 1; @@ -613,8 +433,8 @@ void UDPC_update(UDPC_HContext ctx) { struct sockaddr_in6 receivedData; socklen_t receivedDataSize = sizeof(receivedData); int bytes = recvfrom( - c->socketHandle, - c->recvBuf, + socketHandle, + recvBuf, UDPC_PACKET_MAX_SIZE, 0, (struct sockaddr*) &receivedData, @@ -625,31 +445,31 @@ void UDPC_update(UDPC_HContext ctx) { return; } else if(bytes < 20) { // packet size is too small, invalid packet - c->log( + log( UDPC_LoggingType::INFO, "Received packet is smaller than header, ignoring packet from ", - UDPC_atostr(ctx, UDPC_ConnectionId{receivedData.sin6_addr, 0}), + UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), ":", receivedData.sin6_port); return; } - uint32_t temp = ntohl(*((uint32_t*)c->recvBuf)); - if(temp != c->protocolID) { + uint32_t temp = ntohl(*((uint32_t*)recvBuf)); + if(temp != protocolID) { // Invalid protocol id in packet - c->log( + log( UDPC_LoggingType::INFO, "Received packet has invalid protocol id, ignoring packet from ", - UDPC_atostr(ctx, UDPC_ConnectionId{receivedData.sin6_addr, 0}), + UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), ":", receivedData.sin6_port); return; } - uint32_t conID = ntohl(*((uint32_t*)(c->recvBuf + 4))); - uint32_t seqID = ntohl(*((uint32_t*)(c->recvBuf + 8))); - uint32_t rseq = ntohl(*((uint32_t*)(c->recvBuf + 12))); - uint32_t ack = htonl(*((uint32_t*)(c->recvBuf + 16))); + uint32_t conID = ntohl(*((uint32_t*)(recvBuf + 4))); + uint32_t seqID = ntohl(*((uint32_t*)(recvBuf + 8))); + uint32_t rseq = ntohl(*((uint32_t*)(recvBuf + 12))); + uint32_t ack = htonl(*((uint32_t*)(recvBuf + 16))); bool isConnect = conID & UDPC_ID_CONNECT; bool isPing = conID & UDPC_ID_PING; @@ -659,26 +479,26 @@ void UDPC_update(UDPC_HContext ctx) { UDPC_ConnectionId identifier{receivedData.sin6_addr, ntohs(receivedData.sin6_port)}; - if(isConnect && c->flags.test(2)) { + if(isConnect && flags.test(2)) { // is connect packet and is accepting new connections - if(!c->flags.test(1) - && c->conMap.find(identifier) == c->conMap.end()) { + if(!flags.test(1) + && conMap.find(identifier) == conMap.end()) { // is receiving as server, connection did not already exist - c->log( + log( UDPC_LoggingType::VERBOSE, "Establishing connection with client ", - UDPC_atostr(ctx, UDPC_ConnectionId{receivedData.sin6_addr, 0}), + UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), ":", receivedData.sin6_port); - UDPC::ConnectionData newConnection(true, c); + UDPC::ConnectionData newConnection(true, this); newConnection.addr = receivedData.sin6_addr; newConnection.port = ntohs(receivedData.sin6_port); - c->idMap.insert(std::make_pair(newConnection.id, identifier)); - c->conMap.insert(std::make_pair(identifier, std::move(newConnection))); - auto addrConIter = c->addrConMap.find(identifier.addr); - if(addrConIter == c->addrConMap.end()) { - auto insertResult = c->addrConMap.insert( + idMap.insert(std::make_pair(newConnection.id, identifier)); + conMap.insert(std::make_pair(identifier, std::move(newConnection))); + auto addrConIter = addrConMap.find(identifier.addr); + if(addrConIter == addrConMap.end()) { + auto insertResult = addrConMap.insert( std::make_pair( identifier.addr, std::unordered_set{} @@ -689,19 +509,19 @@ void UDPC_update(UDPC_HContext ctx) { } addrConIter->second.insert(identifier); // TODO trigger event server established connection with client - } else if (c->flags.test(1)) { + } else if (flags.test(1)) { // is client - auto iter = c->conMap.find(identifier); - if(iter == c->conMap.end() || !iter->second.flags.test(3)) { + auto iter = conMap.find(identifier); + if(iter == conMap.end() || !iter->second.flags.test(3)) { return; } iter->second.flags.reset(3); iter->second.id = conID; iter->second.flags.set(4); - c->log( + log( UDPC_LoggingType::VERBOSE, "Established connection with server ", - UDPC_atostr(ctx, UDPC_ConnectionId{receivedData.sin6_addr, 0}), + UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), ":", receivedData.sin6_port); // TODO trigger event client established connection with server @@ -709,8 +529,8 @@ void UDPC_update(UDPC_HContext ctx) { return; } - auto iter = c->conMap.find(identifier); - if(iter == c->conMap.end() || iter->second.flags.test(3) + auto iter = conMap.find(identifier); + if(iter == conMap.end() || iter->second.flags.test(3) || !iter->second.flags.test(4) || iter->second.id != conID) { return; } @@ -719,10 +539,10 @@ void UDPC_update(UDPC_HContext ctx) { } // packet is valid - c->log( + log( UDPC_LoggingType::INFO, "Received valid packet from ", - UDPC_atostr(ctx, UDPC_ConnectionId{receivedData.sin6_addr, 0}), + UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), ":", receivedData.sin6_port); @@ -742,7 +562,7 @@ void UDPC_update(UDPC_HContext ctx) { iter->second.flags.set(2, iter->second.rtt <= UDPC::GOOD_RTT_LIMIT); - c->log( + log( UDPC_LoggingType::INFO, "RTT: ", UDPC::durationToFSec(iter->second.rtt)); @@ -774,7 +594,7 @@ void UDPC_update(UDPC_HContext ctx) { auto duration = now - sentInfoIter->second->sentTime; if(duration > UDPC::PACKET_TIMEOUT_TIME) { if(sentIter->dataSize <= 20) { - c->log( + log( UDPC_LoggingType::INFO, "Timed out packet has no payload (probably " "heartbeat packet), ignoring it"); @@ -809,7 +629,7 @@ void UDPC_update(UDPC_HContext ctx) { diff = 0xFFFFFFFF - seqID + 1 + iter->second.rseq; if((iter->second.ack & (0x80000000 >> (diff - 1))) != 0) { // already received packet - c->log( + log( UDPC_LoggingType::INFO, "Received packet is already marked as received, ignoring it"); return; @@ -823,7 +643,7 @@ void UDPC_update(UDPC_HContext ctx) { // sequence is older if((iter->second.ack & (0x80000000 >> (diff - 1))) != 0) { // already received packet - c->log( + log( UDPC_LoggingType::INFO, "Received packet is already marked as received, ignoring it"); return; @@ -838,21 +658,21 @@ void UDPC_update(UDPC_HContext ctx) { } } else { // already received packet - c->log( + log( UDPC_LoggingType::INFO, "Received packet is already marked as received, ignoring it"); return; } if(isOutOfOrder) { - c->log( + log( UDPC_LoggingType::VERBOSE, "Received packet is out of order"); } if(bytes > 20) { UDPC_PacketInfo recPktInfo = UDPC::get_empty_pinfo(); - std::memcpy(recPktInfo.data, c->recvBuf, bytes); + std::memcpy(recPktInfo.data, recvBuf, bytes); recPktInfo.dataSize = bytes; recPktInfo.flags = (isConnect ? 0x1 : 0) @@ -862,10 +682,10 @@ void UDPC_update(UDPC_HContext ctx) { recPktInfo.sender.addr = receivedData.sin6_addr; recPktInfo.receiver.addr = in6addr_loopback; recPktInfo.sender.port = receivedData.sin6_port; - recPktInfo.receiver.port = c->socketInfo.sin6_port; + recPktInfo.receiver.port = socketInfo.sin6_port; if(iter->second.receivedPkts.size() == iter->second.receivedPkts.capacity()) { - c->log( + log( UDPC_LoggingType::WARNING, "receivedPkts is full, removing oldest entry to make room"); iter->second.receivedPkts.pop(); @@ -873,12 +693,214 @@ void UDPC_update(UDPC_HContext ctx) { iter->second.receivedPkts.push(recPktInfo); } else if(bytes == 20) { - c->log( + log( UDPC_LoggingType::VERBOSE, "Received packet has no payload"); } } +UDPC::Context *UDPC::verifyContext(UDPC_HContext ctx) { + if(ctx == nullptr) { + return nullptr; + } + UDPC::Context *c = (UDPC::Context *)ctx; + if(c->_contextIdentifier == UDPC_CONTEXT_IDENTIFIER) { + return c; + } else { + return nullptr; + } +} + +bool UDPC::isBigEndian() { + static std::optional isBigEndian = std::nullopt; + if(isBigEndian) { + return *isBigEndian; + } + union { + uint32_t i; + char c[4]; + } bint = {0x01020304}; + + isBigEndian = (bint.c[0] == 1); + return *isBigEndian; +} + +void UDPC::preparePacket(char *data, uint32_t protocolID, uint32_t conID, + uint32_t rseq, uint32_t ack, uint32_t *seqID, + int flags) { + uint32_t temp; + + temp = htonl(protocolID); + std::memcpy(data, &temp, 4); + temp = htonl(conID | ((flags & 0x1) != 0 ? UDPC_ID_CONNECT : 0) | + ((flags & 0x2) != 0 ? UDPC_ID_PING : 0) | + ((flags & 0x4) != 0 ? UDPC_ID_NO_REC_CHK : 0) | + ((flags & 0x8) != 0 ? UDPC_ID_RESENDING : 0)); + std::memcpy(data + 4, &temp, 4); + + if(seqID) { + temp = htonl(*seqID); + ++(*seqID); + } else { + temp = 0; + } + std::memcpy(data + 8, &temp, 4); + + temp = htonl(rseq); + std::memcpy(data + 12, &temp, 4); + temp = htonl(ack); + std::memcpy(data + 16, &temp, 4); +} + +uint32_t UDPC::generateConnectionID(Context &ctx) { + auto dist = std::uniform_int_distribution(0, 0xFFFFFFFF); + uint32_t id = dist(ctx.rng_engine); + while(ctx.idMap.find(id) != ctx.idMap.end()) { + id = dist(ctx.rng_engine); + } + return id; +} + +float UDPC::durationToFSec(const std::chrono::steady_clock::duration& duration) { + return (float)duration.count() + * (float)std::decay_t::period::num + / (float)std::decay_t::period::den; +} + +float UDPC::timePointsToFSec( + const std::chrono::steady_clock::time_point& older, + const std::chrono::steady_clock::time_point& newer) { + const auto dt = newer - older; + return (float)dt.count() + * (float)decltype(dt)::period::num / (float)decltype(dt)::period::den; +} + +UDPC_PacketInfo UDPC::get_empty_pinfo() { + return UDPC_PacketInfo { + {0}, // data (array) + 0, // flags + 0, // dataSize + { // sender + {0}, // ipv6 addr + 0 // port + }, + { // receiver + {0}, // ipv6 addr + 0 // port + }, + }; +} + +void UDPC::threadedUpdate(Context *ctx) { + auto now = std::chrono::steady_clock::now(); + decltype(now) nextNow; + while(ctx->threadRunning.load()) { + now = std::chrono::steady_clock::now(); + ctx->update_impl(); + nextNow = std::chrono::steady_clock::now(); + std::this_thread::sleep_for(std::chrono::milliseconds(33) - (nextNow - now)); + } +} + +UDPC_ConnectionId UDPC_create_id(struct in6_addr addr, uint16_t port) { + return UDPC_ConnectionId{addr, port}; +} + +UDPC_ConnectionId UDPC_create_id_anyaddr(uint16_t port) { + return UDPC_ConnectionId{in6addr_any, port}; +} + +UDPC_HContext UDPC_init(UDPC_ConnectionId listenId, int isClient) { + UDPC::Context *ctx = new UDPC::Context(false); + ctx->flags.set(1, isClient); + + // create socket + ctx->socketHandle = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP); + if(ctx->socketHandle <= 0) { + // TODO maybe different way of handling init fail + delete ctx; + return nullptr; + } + + // allow ipv4 connections on ipv6 socket + { + int no = 0; + setsockopt(ctx->socketHandle, IPPROTO_IPV6, IPV6_V6ONLY, &no, sizeof(no)); + } + + // bind socket + ctx->socketInfo.sin6_family = AF_INET6; + ctx->socketInfo.sin6_addr = listenId.addr; + ctx->socketInfo.sin6_port = htons(listenId.port); + if(bind(ctx->socketHandle, (const struct sockaddr *)&ctx->socketInfo, + sizeof(struct sockaddr_in6)) < 0) { + // TODO maybe different way of handling init fail + CleanupSocket(ctx->socketHandle); + delete ctx; + return nullptr; + } + + // TODO verify this is necessary to get the listen port + if(ctx->socketInfo.sin6_port == 0) { + struct sockaddr_in6 getInfo; + socklen_t size = sizeof(struct sockaddr_in6); + if(getsockname(ctx->socketHandle, (struct sockaddr *)&getInfo, &size) == 0) { + ctx->socketInfo.sin6_port = getInfo.sin6_port; + } + } + + // set non-blocking on socket +#if UDPC_PLATFORM == UDPC_PLATFORM_MAC || UDPC_PLATFORM == UDPC_PLATFORM_LINUX + int nonblocking = 1; + if(fcntl(ctx->socketHandle, F_SETFL, O_NONBLOCK, nonblocking) == -1) { +#elif UDPC_PLATFORM == UDPC_PLATFORM_WINDOWS + DWORD nonblocking = 1; + if(ioctlsocket(ctx->socketHandle, FIONBIO, &nonblocking) != 0) { +#else + { +#endif + // TODO maybe different way of handling init fail + CleanupSocket(ctx->socketHandle); + delete ctx; + return nullptr; + } + + return (UDPC_HContext) ctx; +} + +UDPC_HContext UDPC_init_threaded_update(UDPC_ConnectionId listenId, + int isClient) { + UDPC::Context *ctx = (UDPC::Context *)UDPC_init(listenId, isClient); + if(!ctx) { + return nullptr; + } + ctx->flags.set(0); + ctx->thread = std::thread(UDPC::threadedUpdate, ctx); + + return (UDPC_HContext) ctx; +} + +void UDPC_destroy(UDPC_HContext ctx) { + UDPC::Context *UDPC_ctx = UDPC::verifyContext(ctx); + if(UDPC_ctx) { + if(UDPC_ctx->flags.test(0)) { + UDPC_ctx->threadRunning.store(false); + UDPC_ctx->thread.join(); + } + delete UDPC_ctx; + } +} + +void UDPC_update(UDPC_HContext ctx) { + UDPC::Context *c = UDPC::verifyContext(ctx); + if(!c || c->flags.test(0)) { + // invalid or is threaded, update should not be called + return; + } + + c->update_impl(); +} + void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId) { UDPC::Context *c = UDPC::verifyContext(ctx); if(!c || !c->flags.test(1)) { -- 2.49.0