diff --git a/cpp_impl/src/UDPC_Defines.hpp b/cpp_impl/src/UDPC_Defines.hpp index 48f1af9..d03d09a 100644 --- a/cpp_impl/src/UDPC_Defines.hpp +++ b/cpp_impl/src/UDPC_Defines.hpp @@ -224,6 +224,8 @@ private: } // }}} public: + void update_impl(); + uint_fast32_t _contextIdentifier; char recvBuf[UDPC_PACKET_MAX_SIZE]; @@ -252,6 +254,9 @@ public: std::default_random_engine rng_engine; + std::thread thread; + std::atomic_bool threadRunning; + }; // struct Context Context *verifyContext(UDPC_HContext ctx); @@ -278,6 +283,8 @@ float timePointsToFSec( UDPC_PacketInfo get_empty_pinfo(); +void threadedUpdate(Context *ctx); + } // namespace UDPC bool operator ==(const UDPC_ConnectionId& a, const UDPC_ConnectionId& b); diff --git a/cpp_impl/src/UDPConnection.cpp b/cpp_impl/src/UDPConnection.cpp index 0a2f775..1b9106e 100644 --- a/cpp_impl/src/UDPConnection.cpp +++ b/cpp_impl/src/UDPConnection.cpp @@ -106,6 +106,597 @@ rng_engine() } rng_engine.seed(std::chrono::system_clock::now().time_since_epoch().count()); + + threadRunning.store(true); +} + +void UDPC::Context::update_impl() { + const auto now = std::chrono::steady_clock::now(); + lastUpdated = now; + + std::chrono::steady_clock::duration temp_dt_fs; + { + // check timed out, check good/bad mode with rtt, remove timed out + std::vector removed; + for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) { + temp_dt_fs = now - iter->second.received; + if(temp_dt_fs >= UDPC::CONNECTION_TIMEOUT) { + removed.push_back(iter->first); + log( + UDPC_LoggingType::VERBOSE, + "Timed out connection with ", + UDPC_atostr((UDPC_HContext)this, iter->first), + ":", + iter->second.port); + continue; + } + + // check good/bad mode + iter->second.toggleTimer += temp_dt_fs; + iter->second.toggledTimer += temp_dt_fs; + if(iter->second.flags.test(1) && !iter->second.flags.test(2)) { + // good mode, bad rtt + log( + UDPC_LoggingType::INFO, + "Switching to bad mode in connection with ", + UDPC_atostr((UDPC_HContext)this, iter->first), + ":", + iter->second.port); + iter->second.flags.reset(1); + if(iter->second.toggledTimer <= UDPC::TEN_SECONDS) { + iter->second.toggleT *= 2; + } + iter->second.toggledTimer = std::chrono::steady_clock::duration::zero(); + } else if(iter->second.flags.test(1)) { + // good mode, good rtt + if(iter->second.toggleTimer >= UDPC::TEN_SECONDS) { + iter->second.toggleTimer = std::chrono::steady_clock::duration::zero(); + iter->second.toggleT /= 2; + if(iter->second.toggleT < UDPC::ONE_SECOND) { + iter->second.toggleT = UDPC::ONE_SECOND; + } + } + } else if(!iter->second.flags.test(1) && + iter->second.flags.test(2)) { + // bad mode, good rtt + if(iter->second.toggledTimer >= iter->second.toggleT) { + iter->second.toggleTimer = std::chrono::steady_clock::duration::zero(); + iter->second.toggledTimer = std::chrono::steady_clock::duration::zero(); + log( + UDPC_LoggingType::INFO, + "Switching to good mode in connection with ", + UDPC_atostr((UDPC_HContext)this, iter->first), + ":", + iter->second.port); + iter->second.flags.set(1); + } + } else { + // bad mode, bad rtt + iter->second.toggledTimer = std::chrono::steady_clock::duration::zero(); + } + + iter->second.timer += temp_dt_fs; + if(iter->second.timer >= (iter->second.flags.test(1) + ? UDPC::GOOD_MODE_SEND_RATE + : UDPC::BAD_MODE_SEND_RATE)) { + iter->second.timer -= (iter->second.flags.test(1) + ? UDPC::GOOD_MODE_SEND_RATE : UDPC::BAD_MODE_SEND_RATE); + iter->second.flags.set(0); + } + } + for(auto iter = removed.begin(); iter != removed.end(); ++iter) { + auto addrConIter = addrConMap.find(iter->addr); + assert(addrConIter != addrConMap.end() + && "addrConMap must have an entry for a current connection"); + auto addrConSetIter = addrConIter->second.find(*iter); + assert(addrConSetIter != addrConIter->second.end() + && "nested set in addrConMap must have an entry for a current connection"); + addrConIter->second.erase(addrConSetIter); + if(addrConIter->second.empty()) { + addrConMap.erase(addrConIter); + } + + auto cIter = conMap.find(*iter); + assert(cIter != conMap.end() + && "conMap must have the entry set to be removed"); + + if(cIter->second.flags.test(4)) { + idMap.erase(cIter->second.id); + } + + conMap.erase(cIter); + } + } + + // update send (only if triggerSend flag is set) + for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) { + if(!iter->second.flags.test(0)) { + continue; + } + iter->second.flags.reset(0); + + if(iter->second.flags.test(3)) { + if(flags.test(1)) { + // is initiating connection to server + auto initDT = now - iter->second.sent; + if(initDT < UDPC::INIT_PKT_INTERVAL_DT) { + continue; + } + iter->second.sent = now; + + std::unique_ptr buf = std::make_unique(20); + UDPC::preparePacket( + buf.get(), + protocolID, + 0, + 0, + 0xFFFFFFFF, + nullptr, + 0x1); + + struct sockaddr_in6 destinationInfo; + destinationInfo.sin6_family = AF_INET6; + std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); + destinationInfo.sin6_port = htons(iter->second.port); + long int sentBytes = sendto( + socketHandle, + buf.get(), + 20, + 0, + (struct sockaddr*) &destinationInfo, + sizeof(struct sockaddr_in6)); + if(sentBytes != 20) { + log( + UDPC_LoggingType::ERROR, + "Failed to send packet to initiate connection to ", + UDPC_atostr((UDPC_HContext)this, iter->first), + ":", + iter->second.port); + continue; + } + } else { + // is server, initiate connection to client + iter->second.flags.reset(3); + iter->second.sent = now; + + std::unique_ptr buf = std::make_unique(20); + UDPC::preparePacket( + buf.get(), + protocolID, + iter->second.id, + iter->second.rseq, + iter->second.ack, + &iter->second.lseq, + 0x1); + + struct sockaddr_in6 destinationInfo; + destinationInfo.sin6_family = AF_INET6; + std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); + destinationInfo.sin6_port = htons(iter->second.port); + long int sentBytes = sendto( + socketHandle, + buf.get(), + 20, + 0, + (struct sockaddr*) &destinationInfo, + sizeof(struct sockaddr_in6)); + if(sentBytes != 20) { + log( + UDPC_LoggingType::ERROR, + "Failed to send packet to initiate connection to ", + UDPC_atostr((UDPC_HContext)this, iter->first), + ":", + iter->second.port); + continue; + } + } + continue; + } + + // Not initiating connection, send as normal on current connection + if(iter->second.sendPkts.empty() && iter->second.priorityPkts.empty()) { + // nothing in queues, send heartbeat packet + auto sentDT = now - iter->second.sent; + if(sentDT < UDPC::HEARTBEAT_PKT_INTERVAL_DT) { + continue; + } + + std::unique_ptr buf = std::make_unique(20); + UDPC::preparePacket( + buf.get(), + protocolID, + iter->second.id, + iter->second.rseq, + iter->second.ack, + &iter->second.lseq, + 0); + + struct sockaddr_in6 destinationInfo; + destinationInfo.sin6_family = AF_INET6; + std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); + destinationInfo.sin6_port = htons(iter->second.port); + long int sentBytes = sendto( + socketHandle, + buf.get(), + 20, + 0, + (struct sockaddr*) &destinationInfo, + sizeof(struct sockaddr_in6)); + if(sentBytes != 20) { + log( + UDPC_LoggingType::ERROR, + "Failed to send heartbeat packet to ", + UDPC_atostr((UDPC_HContext)this, iter->first), + ":", + iter->second.port); + continue; + } + + UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo(); + pInfo.sender.addr = in6addr_loopback; + pInfo.receiver.addr = iter->first.addr; + pInfo.sender.port = socketInfo.sin6_port; + pInfo.receiver.port = iter->second.port; + *((uint32_t*)(pInfo.data + 8)) = iter->second.lseq - 1; + + iter->second.sentPkts.push_back(std::move(pInfo)); + iter->second.cleanupSentPkts(); + + // store other pkt info + UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared(); + sentPktInfo->id = iter->second.lseq - 1; + iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo)); + } else { + // sendPkts or priorityPkts not empty + UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo(); + bool isResending = false; + if(!iter->second.priorityPkts.empty()) { + // TODO verify getting struct copy is valid + pInfo = std::move(iter->second.priorityPkts.top().value()); + iter->second.priorityPkts.pop(); + isResending = true; + } else { + pInfo = std::move(iter->second.sendPkts.top().value()); + iter->second.sendPkts.pop(); + } + std::unique_ptr buf = std::make_unique(20 + pInfo.dataSize); + UDPC::preparePacket( + buf.get(), + protocolID, + iter->second.id, + iter->second.rseq, + iter->second.ack, + &iter->second.lseq, + (pInfo.flags & 0x4) | (isResending ? 0x8 : 0)); + std::memcpy(buf.get() + 20, pInfo.data, pInfo.dataSize); + + struct sockaddr_in6 destinationInfo; + destinationInfo.sin6_family = AF_INET6; + std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); + destinationInfo.sin6_port = htons(iter->second.port); + long int sentBytes = sendto( + socketHandle, + buf.get(), + pInfo.dataSize + 20, + 0, + (struct sockaddr*) &destinationInfo, + sizeof(struct sockaddr_in6)); + if(sentBytes != 20 + pInfo.dataSize) { + log( + UDPC_LoggingType::ERROR, + "Failed to send packet to ", + UDPC_atostr((UDPC_HContext)this, iter->first), + ":", + iter->second.port); + continue; + } + + if((pInfo.flags & 0x4) == 0) { + // is check-received, store data in case packet gets lost + UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo(); + std::memcpy(sentPInfo.data, buf.get(), 20 + pInfo.dataSize); + sentPInfo.flags = 0; + sentPInfo.dataSize = 20 + pInfo.dataSize; + sentPInfo.sender.addr = in6addr_loopback; + sentPInfo.receiver.addr = iter->first.addr; + sentPInfo.sender.port = socketInfo.sin6_port; + sentPInfo.receiver.port = iter->second.port; + + iter->second.sentPkts.push_back(std::move(pInfo)); + iter->second.cleanupSentPkts(); + } else { + // is not check-received, only id stored in data array + UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo(); + sentPInfo.flags = 0x4; + sentPInfo.dataSize = 0; + sentPInfo.sender.addr = in6addr_loopback; + sentPInfo.receiver.addr = iter->first.addr; + sentPInfo.sender.port = socketInfo.sin6_port; + sentPInfo.receiver.port = iter->second.port; + *((uint32_t*)(sentPInfo.data + 8)) = iter->second.lseq - 1; + + iter->second.sentPkts.push_back(std::move(pInfo)); + iter->second.cleanupSentPkts(); + } + + // store other pkt info + UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared(); + sentPktInfo->id = iter->second.lseq - 1; + iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo)); + } + } + + // receive packet +#if UDPC_PLATFORM == UDPC_PLATFORM_WINDOWS + typedef int socklen_t; +#endif + struct sockaddr_in6 receivedData; + socklen_t receivedDataSize = sizeof(receivedData); + int bytes = recvfrom( + socketHandle, + recvBuf, + UDPC_PACKET_MAX_SIZE, + 0, + (struct sockaddr*) &receivedData, + &receivedDataSize); + + if(bytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + // no packet was received + return; + } else if(bytes < 20) { + // packet size is too small, invalid packet + log( + UDPC_LoggingType::INFO, + "Received packet is smaller than header, ignoring packet from ", + UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), + ":", + receivedData.sin6_port); + return; + } + + uint32_t temp = ntohl(*((uint32_t*)recvBuf)); + if(temp != protocolID) { + // Invalid protocol id in packet + log( + UDPC_LoggingType::INFO, + "Received packet has invalid protocol id, ignoring packet from ", + UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), + ":", + receivedData.sin6_port); + return; + } + + uint32_t conID = ntohl(*((uint32_t*)(recvBuf + 4))); + uint32_t seqID = ntohl(*((uint32_t*)(recvBuf + 8))); + uint32_t rseq = ntohl(*((uint32_t*)(recvBuf + 12))); + uint32_t ack = htonl(*((uint32_t*)(recvBuf + 16))); + + bool isConnect = conID & UDPC_ID_CONNECT; + bool isPing = conID & UDPC_ID_PING; + bool isNotRecChecked = conID & UDPC_ID_NO_REC_CHK; + bool isResending = conID & UDPC_ID_RESENDING; + conID &= 0x0FFFFFFF; + + UDPC_ConnectionId identifier{receivedData.sin6_addr, ntohs(receivedData.sin6_port)}; + + if(isConnect && flags.test(2)) { + // is connect packet and is accepting new connections + if(!flags.test(1) + && conMap.find(identifier) == conMap.end()) { + // is receiving as server, connection did not already exist + log( + UDPC_LoggingType::VERBOSE, + "Establishing connection with client ", + UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), + ":", + receivedData.sin6_port); + UDPC::ConnectionData newConnection(true, this); + newConnection.addr = receivedData.sin6_addr; + newConnection.port = ntohs(receivedData.sin6_port); + + idMap.insert(std::make_pair(newConnection.id, identifier)); + conMap.insert(std::make_pair(identifier, std::move(newConnection))); + auto addrConIter = addrConMap.find(identifier.addr); + if(addrConIter == addrConMap.end()) { + auto insertResult = addrConMap.insert( + std::make_pair( + identifier.addr, + std::unordered_set{} + )); + assert(insertResult.second + && "Must successfully insert into addrConMap"); + addrConIter = insertResult.first; + } + addrConIter->second.insert(identifier); + // TODO trigger event server established connection with client + } else if (flags.test(1)) { + // is client + auto iter = conMap.find(identifier); + if(iter == conMap.end() || !iter->second.flags.test(3)) { + return; + } + iter->second.flags.reset(3); + iter->second.id = conID; + iter->second.flags.set(4); + log( + UDPC_LoggingType::VERBOSE, + "Established connection with server ", + UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), + ":", + receivedData.sin6_port); + // TODO trigger event client established connection with server + } + return; + } + + auto iter = conMap.find(identifier); + if(iter == conMap.end() || iter->second.flags.test(3) + || !iter->second.flags.test(4) || iter->second.id != conID) { + return; + } + else if(isPing) { + iter->second.flags.set(0); + } + + // packet is valid + log( + UDPC_LoggingType::INFO, + "Received valid packet from ", + UDPC_atostr((UDPC_HContext)this, UDPC_ConnectionId{receivedData.sin6_addr, 0}), + ":", + receivedData.sin6_port); + + // update rtt + for(auto sentIter = iter->second.sentPkts.rbegin(); sentIter != iter->second.sentPkts.rend(); ++sentIter) { + uint32_t id = ntohl(*((uint32_t*)(sentIter->data + 8))); + if(id == rseq) { + auto sentInfoIter = iter->second.sentInfoMap.find(id); + assert(sentInfoIter != iter->second.sentInfoMap.end() + && "sentInfoMap should have known stored id"); + auto diff = now - sentInfoIter->second->sentTime; + if(diff > iter->second.rtt) { + iter->second.rtt += (diff - iter->second.rtt) / 10; + } else { + iter->second.rtt -= (iter->second.rtt - diff) / 10; + } + + iter->second.flags.set(2, iter->second.rtt <= UDPC::GOOD_RTT_LIMIT); + + log( + UDPC_LoggingType::INFO, + "RTT: ", + UDPC::durationToFSec(iter->second.rtt)); + break; + } + } + + iter->second.received = now; + + // check pkt timeout + --rseq; + for(; ack != 0; ack = ack << 1) { + if((ack & 0x80000000) != 0) { + --rseq; + continue; + } + + // pkt not received yet, find it in sent to check if it timed out + for(auto sentIter = iter->second.sentPkts.rbegin(); sentIter != iter->second.sentPkts.rend(); ++sentIter) { + uint32_t sentID = ntohl(*((uint32_t*)(sentIter->data + 8))); + if(sentID == rseq) { + if((sentIter->flags & 0x4) != 0 || (sentIter->flags & 0x8) != 0) { + // already resent or not rec-checked pkt + break; + } + auto sentInfoIter = iter->second.sentInfoMap.find(sentID); + assert(sentInfoIter != iter->second.sentInfoMap.end() + && "Every entry in sentPkts must have a corresponding entry in sentInfoMap"); + auto duration = now - sentInfoIter->second->sentTime; + if(duration > UDPC::PACKET_TIMEOUT_TIME) { + if(sentIter->dataSize <= 20) { + log( + UDPC_LoggingType::INFO, + "Timed out packet has no payload (probably " + "heartbeat packet), ignoring it"); + sentIter->flags |= 0x8; + break; + } + + UDPC_PacketInfo resendingData = UDPC::get_empty_pinfo(); + resendingData.dataSize = sentIter->dataSize - 20; + std::memcpy(resendingData.data, sentIter->data + 20, resendingData.dataSize); + resendingData.flags = 0; + iter->second.priorityPkts.push(resendingData); + } + break; + } + } + + --rseq; + } + + // calculate sequence and ack + bool isOutOfOrder = false; + uint32_t diff = 0; + if(seqID > iter->second.rseq) { + diff = seqID - iter->second.rseq; + if(diff <= 0x7FFFFFFF) { + // sequence is more recent + iter->second.rseq = seqID; + iter->second.ack = (iter->second.ack >> diff) | 0x80000000; + } else { + // sequence is older, recalc diff + diff = 0xFFFFFFFF - seqID + 1 + iter->second.rseq; + if((iter->second.ack & (0x80000000 >> (diff - 1))) != 0) { + // already received packet + log( + UDPC_LoggingType::INFO, + "Received packet is already marked as received, ignoring it"); + return; + } + iter->second.ack |= 0x80000000 >> (diff - 1); + isOutOfOrder = true; + } + } else if(seqID < iter->second.rseq) { + diff = iter->second.rseq - seqID; + if(diff <= 0x7FFFFFFF) { + // sequence is older + if((iter->second.ack & (0x80000000 >> (diff - 1))) != 0) { + // already received packet + log( + UDPC_LoggingType::INFO, + "Received packet is already marked as received, ignoring it"); + return; + } + iter->second.ack |= 0x80000000 >> (diff - 1); + isOutOfOrder = true; + } else { + // sequence is more recent, recalc diff + diff = 0xFFFFFFFF - iter->second.rseq + 1 + seqID; + iter->second.rseq = seqID; + iter->second.ack = (iter->second.ack >> diff) | 0x80000000; + } + } else { + // already received packet + log( + UDPC_LoggingType::INFO, + "Received packet is already marked as received, ignoring it"); + return; + } + + if(isOutOfOrder) { + log( + UDPC_LoggingType::VERBOSE, + "Received packet is out of order"); + } + + if(bytes > 20) { + UDPC_PacketInfo recPktInfo = UDPC::get_empty_pinfo(); + std::memcpy(recPktInfo.data, recvBuf, bytes); + recPktInfo.dataSize = bytes; + recPktInfo.flags = + (isConnect ? 0x1 : 0) + | (isPing ? 0x2 : 0) + | (isNotRecChecked ? 0x4 : 0) + | (isResending ? 0x8 : 0); + recPktInfo.sender.addr = receivedData.sin6_addr; + recPktInfo.receiver.addr = in6addr_loopback; + recPktInfo.sender.port = receivedData.sin6_port; + recPktInfo.receiver.port = socketInfo.sin6_port; + + if(iter->second.receivedPkts.size() == iter->second.receivedPkts.capacity()) { + log( + UDPC_LoggingType::WARNING, + "receivedPkts is full, removing oldest entry to make room"); + iter->second.receivedPkts.pop(); + } + + iter->second.receivedPkts.push(recPktInfo); + } else if(bytes == 20) { + log( + UDPC_LoggingType::VERBOSE, + "Received packet has no payload"); + } } UDPC::Context *UDPC::verifyContext(UDPC_HContext ctx) { @@ -200,6 +791,17 @@ UDPC_PacketInfo UDPC::get_empty_pinfo() { }; } +void UDPC::threadedUpdate(Context *ctx) { + auto now = std::chrono::steady_clock::now(); + decltype(now) nextNow; + while(ctx->threadRunning.load()) { + now = std::chrono::steady_clock::now(); + ctx->update_impl(); + nextNow = std::chrono::steady_clock::now(); + std::this_thread::sleep_for(std::chrono::milliseconds(33) - (nextNow - now)); + } +} + UDPC_ConnectionId UDPC_create_id(struct in6_addr addr, uint16_t port) { return UDPC_ConnectionId{addr, port}; } @@ -273,6 +875,7 @@ UDPC_HContext UDPC_init_threaded_update(UDPC_ConnectionId listenId, return nullptr; } ctx->flags.set(0); + ctx->thread = std::thread(UDPC::threadedUpdate, ctx); return (UDPC_HContext) ctx; } @@ -280,6 +883,10 @@ UDPC_HContext UDPC_init_threaded_update(UDPC_ConnectionId listenId, void UDPC_destroy(UDPC_HContext ctx) { UDPC::Context *UDPC_ctx = UDPC::verifyContext(ctx); if(UDPC_ctx) { + if(UDPC_ctx->flags.test(0)) { + UDPC_ctx->threadRunning.store(false); + UDPC_ctx->thread.join(); + } delete UDPC_ctx; } } @@ -291,592 +898,7 @@ void UDPC_update(UDPC_HContext ctx) { return; } - const auto now = std::chrono::steady_clock::now(); - c->lastUpdated = now; - - std::chrono::steady_clock::duration temp_dt_fs; - { - // check timed out, check good/bad mode with rtt, remove timed out - std::vector removed; - for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) { - temp_dt_fs = now - iter->second.received; - if(temp_dt_fs >= UDPC::CONNECTION_TIMEOUT) { - removed.push_back(iter->first); - c->log( - UDPC_LoggingType::VERBOSE, - "Timed out connection with ", - UDPC_atostr(ctx, iter->first), - ":", - iter->second.port); - continue; - } - - // check good/bad mode - iter->second.toggleTimer += temp_dt_fs; - iter->second.toggledTimer += temp_dt_fs; - if(iter->second.flags.test(1) && !iter->second.flags.test(2)) { - // good mode, bad rtt - c->log( - UDPC_LoggingType::INFO, - "Switching to bad mode in connection with ", - UDPC_atostr(ctx, iter->first), - ":", - iter->second.port); - iter->second.flags.reset(1); - if(iter->second.toggledTimer <= UDPC::TEN_SECONDS) { - iter->second.toggleT *= 2; - } - iter->second.toggledTimer = std::chrono::steady_clock::duration::zero(); - } else if(iter->second.flags.test(1)) { - // good mode, good rtt - if(iter->second.toggleTimer >= UDPC::TEN_SECONDS) { - iter->second.toggleTimer = std::chrono::steady_clock::duration::zero(); - iter->second.toggleT /= 2; - if(iter->second.toggleT < UDPC::ONE_SECOND) { - iter->second.toggleT = UDPC::ONE_SECOND; - } - } - } else if(!iter->second.flags.test(1) && - iter->second.flags.test(2)) { - // bad mode, good rtt - if(iter->second.toggledTimer >= iter->second.toggleT) { - iter->second.toggleTimer = std::chrono::steady_clock::duration::zero(); - iter->second.toggledTimer = std::chrono::steady_clock::duration::zero(); - c->log( - UDPC_LoggingType::INFO, - "Switching to good mode in connection with ", - UDPC_atostr(ctx, iter->first), - ":", - iter->second.port); - iter->second.flags.set(1); - } - } else { - // bad mode, bad rtt - iter->second.toggledTimer = std::chrono::steady_clock::duration::zero(); - } - - iter->second.timer += temp_dt_fs; - if(iter->second.timer >= (iter->second.flags.test(1) - ? UDPC::GOOD_MODE_SEND_RATE - : UDPC::BAD_MODE_SEND_RATE)) { - iter->second.timer -= (iter->second.flags.test(1) - ? UDPC::GOOD_MODE_SEND_RATE : UDPC::BAD_MODE_SEND_RATE); - iter->second.flags.set(0); - } - } - for(auto iter = removed.begin(); iter != removed.end(); ++iter) { - auto addrConIter = c->addrConMap.find(iter->addr); - assert(addrConIter != c->addrConMap.end() - && "addrConMap must have an entry for a current connection"); - auto addrConSetIter = addrConIter->second.find(*iter); - assert(addrConSetIter != addrConIter->second.end() - && "nested set in addrConMap must have an entry for a current connection"); - addrConIter->second.erase(addrConSetIter); - if(addrConIter->second.empty()) { - c->addrConMap.erase(addrConIter); - } - - auto cIter = c->conMap.find(*iter); - assert(cIter != c->conMap.end() - && "conMap must have the entry set to be removed"); - - if(cIter->second.flags.test(4)) { - c->idMap.erase(cIter->second.id); - } - - c->conMap.erase(cIter); - } - } - - // update send (only if triggerSend flag is set) - for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) { - if(!iter->second.flags.test(0)) { - continue; - } - iter->second.flags.reset(0); - - if(iter->second.flags.test(3)) { - if(c->flags.test(1)) { - // is initiating connection to server - auto initDT = now - iter->second.sent; - if(initDT < UDPC::INIT_PKT_INTERVAL_DT) { - continue; - } - iter->second.sent = now; - - std::unique_ptr buf = std::make_unique(20); - UDPC::preparePacket( - buf.get(), - c->protocolID, - 0, - 0, - 0xFFFFFFFF, - nullptr, - 0x1); - - struct sockaddr_in6 destinationInfo; - destinationInfo.sin6_family = AF_INET6; - std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); - destinationInfo.sin6_port = htons(iter->second.port); - long int sentBytes = sendto( - c->socketHandle, - buf.get(), - 20, - 0, - (struct sockaddr*) &destinationInfo, - sizeof(struct sockaddr_in6)); - if(sentBytes != 20) { - c->log( - UDPC_LoggingType::ERROR, - "Failed to send packet to initiate connection to ", - UDPC_atostr(ctx, iter->first), - ":", - iter->second.port); - continue; - } - } else { - // is server, initiate connection to client - iter->second.flags.reset(3); - iter->second.sent = now; - - std::unique_ptr buf = std::make_unique(20); - UDPC::preparePacket( - buf.get(), - c->protocolID, - iter->second.id, - iter->second.rseq, - iter->second.ack, - &iter->second.lseq, - 0x1); - - struct sockaddr_in6 destinationInfo; - destinationInfo.sin6_family = AF_INET6; - std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); - destinationInfo.sin6_port = htons(iter->second.port); - long int sentBytes = sendto( - c->socketHandle, - buf.get(), - 20, - 0, - (struct sockaddr*) &destinationInfo, - sizeof(struct sockaddr_in6)); - if(sentBytes != 20) { - c->log( - UDPC_LoggingType::ERROR, - "Failed to send packet to initiate connection to ", - UDPC_atostr(ctx, iter->first), - ":", - iter->second.port); - continue; - } - } - continue; - } - - // Not initiating connection, send as normal on current connection - if(iter->second.sendPkts.empty() && iter->second.priorityPkts.empty()) { - // nothing in queues, send heartbeat packet - auto sentDT = now - iter->second.sent; - if(sentDT < UDPC::HEARTBEAT_PKT_INTERVAL_DT) { - continue; - } - - std::unique_ptr buf = std::make_unique(20); - UDPC::preparePacket( - buf.get(), - c->protocolID, - iter->second.id, - iter->second.rseq, - iter->second.ack, - &iter->second.lseq, - 0); - - struct sockaddr_in6 destinationInfo; - destinationInfo.sin6_family = AF_INET6; - std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); - destinationInfo.sin6_port = htons(iter->second.port); - long int sentBytes = sendto( - c->socketHandle, - buf.get(), - 20, - 0, - (struct sockaddr*) &destinationInfo, - sizeof(struct sockaddr_in6)); - if(sentBytes != 20) { - c->log( - UDPC_LoggingType::ERROR, - "Failed to send heartbeat packet to ", - UDPC_atostr(ctx, iter->first), - ":", - iter->second.port); - continue; - } - - UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo(); - pInfo.sender.addr = in6addr_loopback; - pInfo.receiver.addr = iter->first.addr; - pInfo.sender.port = c->socketInfo.sin6_port; - pInfo.receiver.port = iter->second.port; - *((uint32_t*)(pInfo.data + 8)) = iter->second.lseq - 1; - - iter->second.sentPkts.push_back(std::move(pInfo)); - iter->second.cleanupSentPkts(); - - // store other pkt info - UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared(); - sentPktInfo->id = iter->second.lseq - 1; - iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo)); - } else { - // sendPkts or priorityPkts not empty - UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo(); - bool isResending = false; - if(!iter->second.priorityPkts.empty()) { - // TODO verify getting struct copy is valid - pInfo = std::move(iter->second.priorityPkts.top().value()); - iter->second.priorityPkts.pop(); - isResending = true; - } else { - pInfo = std::move(iter->second.sendPkts.top().value()); - iter->second.sendPkts.pop(); - } - std::unique_ptr buf = std::make_unique(20 + pInfo.dataSize); - UDPC::preparePacket( - buf.get(), - c->protocolID, - iter->second.id, - iter->second.rseq, - iter->second.ack, - &iter->second.lseq, - (pInfo.flags & 0x4) | (isResending ? 0x8 : 0)); - std::memcpy(buf.get() + 20, pInfo.data, pInfo.dataSize); - - struct sockaddr_in6 destinationInfo; - destinationInfo.sin6_family = AF_INET6; - std::memcpy(destinationInfo.sin6_addr.s6_addr, iter->first.addr.s6_addr, 16); - destinationInfo.sin6_port = htons(iter->second.port); - long int sentBytes = sendto( - c->socketHandle, - buf.get(), - pInfo.dataSize + 20, - 0, - (struct sockaddr*) &destinationInfo, - sizeof(struct sockaddr_in6)); - if(sentBytes != 20 + pInfo.dataSize) { - c->log( - UDPC_LoggingType::ERROR, - "Failed to send packet to ", - UDPC_atostr(ctx, iter->first), - ":", - iter->second.port); - continue; - } - - if((pInfo.flags & 0x4) == 0) { - // is check-received, store data in case packet gets lost - UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo(); - std::memcpy(sentPInfo.data, buf.get(), 20 + pInfo.dataSize); - sentPInfo.flags = 0; - sentPInfo.dataSize = 20 + pInfo.dataSize; - sentPInfo.sender.addr = in6addr_loopback; - sentPInfo.receiver.addr = iter->first.addr; - sentPInfo.sender.port = c->socketInfo.sin6_port; - sentPInfo.receiver.port = iter->second.port; - - iter->second.sentPkts.push_back(std::move(pInfo)); - iter->second.cleanupSentPkts(); - } else { - // is not check-received, only id stored in data array - UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo(); - sentPInfo.flags = 0x4; - sentPInfo.dataSize = 0; - sentPInfo.sender.addr = in6addr_loopback; - sentPInfo.receiver.addr = iter->first.addr; - sentPInfo.sender.port = c->socketInfo.sin6_port; - sentPInfo.receiver.port = iter->second.port; - *((uint32_t*)(sentPInfo.data + 8)) = iter->second.lseq - 1; - - iter->second.sentPkts.push_back(std::move(pInfo)); - iter->second.cleanupSentPkts(); - } - - // store other pkt info - UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared(); - sentPktInfo->id = iter->second.lseq - 1; - iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo)); - } - } - - // receive packet -#if UDPC_PLATFORM == UDPC_PLATFORM_WINDOWS - typedef int socklen_t; -#endif - struct sockaddr_in6 receivedData; - socklen_t receivedDataSize = sizeof(receivedData); - int bytes = recvfrom( - c->socketHandle, - c->recvBuf, - UDPC_PACKET_MAX_SIZE, - 0, - (struct sockaddr*) &receivedData, - &receivedDataSize); - - if(bytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { - // no packet was received - return; - } else if(bytes < 20) { - // packet size is too small, invalid packet - c->log( - UDPC_LoggingType::INFO, - "Received packet is smaller than header, ignoring packet from ", - UDPC_atostr(ctx, UDPC_ConnectionId{receivedData.sin6_addr, 0}), - ":", - receivedData.sin6_port); - return; - } - - uint32_t temp = ntohl(*((uint32_t*)c->recvBuf)); - if(temp != c->protocolID) { - // Invalid protocol id in packet - c->log( - UDPC_LoggingType::INFO, - "Received packet has invalid protocol id, ignoring packet from ", - UDPC_atostr(ctx, UDPC_ConnectionId{receivedData.sin6_addr, 0}), - ":", - receivedData.sin6_port); - return; - } - - uint32_t conID = ntohl(*((uint32_t*)(c->recvBuf + 4))); - uint32_t seqID = ntohl(*((uint32_t*)(c->recvBuf + 8))); - uint32_t rseq = ntohl(*((uint32_t*)(c->recvBuf + 12))); - uint32_t ack = htonl(*((uint32_t*)(c->recvBuf + 16))); - - bool isConnect = conID & UDPC_ID_CONNECT; - bool isPing = conID & UDPC_ID_PING; - bool isNotRecChecked = conID & UDPC_ID_NO_REC_CHK; - bool isResending = conID & UDPC_ID_RESENDING; - conID &= 0x0FFFFFFF; - - UDPC_ConnectionId identifier{receivedData.sin6_addr, ntohs(receivedData.sin6_port)}; - - if(isConnect && c->flags.test(2)) { - // is connect packet and is accepting new connections - if(!c->flags.test(1) - && c->conMap.find(identifier) == c->conMap.end()) { - // is receiving as server, connection did not already exist - c->log( - UDPC_LoggingType::VERBOSE, - "Establishing connection with client ", - UDPC_atostr(ctx, UDPC_ConnectionId{receivedData.sin6_addr, 0}), - ":", - receivedData.sin6_port); - UDPC::ConnectionData newConnection(true, c); - newConnection.addr = receivedData.sin6_addr; - newConnection.port = ntohs(receivedData.sin6_port); - - c->idMap.insert(std::make_pair(newConnection.id, identifier)); - c->conMap.insert(std::make_pair(identifier, std::move(newConnection))); - auto addrConIter = c->addrConMap.find(identifier.addr); - if(addrConIter == c->addrConMap.end()) { - auto insertResult = c->addrConMap.insert( - std::make_pair( - identifier.addr, - std::unordered_set{} - )); - assert(insertResult.second - && "Must successfully insert into addrConMap"); - addrConIter = insertResult.first; - } - addrConIter->second.insert(identifier); - // TODO trigger event server established connection with client - } else if (c->flags.test(1)) { - // is client - auto iter = c->conMap.find(identifier); - if(iter == c->conMap.end() || !iter->second.flags.test(3)) { - return; - } - iter->second.flags.reset(3); - iter->second.id = conID; - iter->second.flags.set(4); - c->log( - UDPC_LoggingType::VERBOSE, - "Established connection with server ", - UDPC_atostr(ctx, UDPC_ConnectionId{receivedData.sin6_addr, 0}), - ":", - receivedData.sin6_port); - // TODO trigger event client established connection with server - } - return; - } - - auto iter = c->conMap.find(identifier); - if(iter == c->conMap.end() || iter->second.flags.test(3) - || !iter->second.flags.test(4) || iter->second.id != conID) { - return; - } - else if(isPing) { - iter->second.flags.set(0); - } - - // packet is valid - c->log( - UDPC_LoggingType::INFO, - "Received valid packet from ", - UDPC_atostr(ctx, UDPC_ConnectionId{receivedData.sin6_addr, 0}), - ":", - receivedData.sin6_port); - - // update rtt - for(auto sentIter = iter->second.sentPkts.rbegin(); sentIter != iter->second.sentPkts.rend(); ++sentIter) { - uint32_t id = ntohl(*((uint32_t*)(sentIter->data + 8))); - if(id == rseq) { - auto sentInfoIter = iter->second.sentInfoMap.find(id); - assert(sentInfoIter != iter->second.sentInfoMap.end() - && "sentInfoMap should have known stored id"); - auto diff = now - sentInfoIter->second->sentTime; - if(diff > iter->second.rtt) { - iter->second.rtt += (diff - iter->second.rtt) / 10; - } else { - iter->second.rtt -= (iter->second.rtt - diff) / 10; - } - - iter->second.flags.set(2, iter->second.rtt <= UDPC::GOOD_RTT_LIMIT); - - c->log( - UDPC_LoggingType::INFO, - "RTT: ", - UDPC::durationToFSec(iter->second.rtt)); - break; - } - } - - iter->second.received = now; - - // check pkt timeout - --rseq; - for(; ack != 0; ack = ack << 1) { - if((ack & 0x80000000) != 0) { - --rseq; - continue; - } - - // pkt not received yet, find it in sent to check if it timed out - for(auto sentIter = iter->second.sentPkts.rbegin(); sentIter != iter->second.sentPkts.rend(); ++sentIter) { - uint32_t sentID = ntohl(*((uint32_t*)(sentIter->data + 8))); - if(sentID == rseq) { - if((sentIter->flags & 0x4) != 0 || (sentIter->flags & 0x8) != 0) { - // already resent or not rec-checked pkt - break; - } - auto sentInfoIter = iter->second.sentInfoMap.find(sentID); - assert(sentInfoIter != iter->second.sentInfoMap.end() - && "Every entry in sentPkts must have a corresponding entry in sentInfoMap"); - auto duration = now - sentInfoIter->second->sentTime; - if(duration > UDPC::PACKET_TIMEOUT_TIME) { - if(sentIter->dataSize <= 20) { - c->log( - UDPC_LoggingType::INFO, - "Timed out packet has no payload (probably " - "heartbeat packet), ignoring it"); - sentIter->flags |= 0x8; - break; - } - - UDPC_PacketInfo resendingData = UDPC::get_empty_pinfo(); - resendingData.dataSize = sentIter->dataSize - 20; - std::memcpy(resendingData.data, sentIter->data + 20, resendingData.dataSize); - resendingData.flags = 0; - iter->second.priorityPkts.push(resendingData); - } - break; - } - } - - --rseq; - } - - // calculate sequence and ack - bool isOutOfOrder = false; - uint32_t diff = 0; - if(seqID > iter->second.rseq) { - diff = seqID - iter->second.rseq; - if(diff <= 0x7FFFFFFF) { - // sequence is more recent - iter->second.rseq = seqID; - iter->second.ack = (iter->second.ack >> diff) | 0x80000000; - } else { - // sequence is older, recalc diff - diff = 0xFFFFFFFF - seqID + 1 + iter->second.rseq; - if((iter->second.ack & (0x80000000 >> (diff - 1))) != 0) { - // already received packet - c->log( - UDPC_LoggingType::INFO, - "Received packet is already marked as received, ignoring it"); - return; - } - iter->second.ack |= 0x80000000 >> (diff - 1); - isOutOfOrder = true; - } - } else if(seqID < iter->second.rseq) { - diff = iter->second.rseq - seqID; - if(diff <= 0x7FFFFFFF) { - // sequence is older - if((iter->second.ack & (0x80000000 >> (diff - 1))) != 0) { - // already received packet - c->log( - UDPC_LoggingType::INFO, - "Received packet is already marked as received, ignoring it"); - return; - } - iter->second.ack |= 0x80000000 >> (diff - 1); - isOutOfOrder = true; - } else { - // sequence is more recent, recalc diff - diff = 0xFFFFFFFF - iter->second.rseq + 1 + seqID; - iter->second.rseq = seqID; - iter->second.ack = (iter->second.ack >> diff) | 0x80000000; - } - } else { - // already received packet - c->log( - UDPC_LoggingType::INFO, - "Received packet is already marked as received, ignoring it"); - return; - } - - if(isOutOfOrder) { - c->log( - UDPC_LoggingType::VERBOSE, - "Received packet is out of order"); - } - - if(bytes > 20) { - UDPC_PacketInfo recPktInfo = UDPC::get_empty_pinfo(); - std::memcpy(recPktInfo.data, c->recvBuf, bytes); - recPktInfo.dataSize = bytes; - recPktInfo.flags = - (isConnect ? 0x1 : 0) - | (isPing ? 0x2 : 0) - | (isNotRecChecked ? 0x4 : 0) - | (isResending ? 0x8 : 0); - recPktInfo.sender.addr = receivedData.sin6_addr; - recPktInfo.receiver.addr = in6addr_loopback; - recPktInfo.sender.port = receivedData.sin6_port; - recPktInfo.receiver.port = c->socketInfo.sin6_port; - - if(iter->second.receivedPkts.size() == iter->second.receivedPkts.capacity()) { - c->log( - UDPC_LoggingType::WARNING, - "receivedPkts is full, removing oldest entry to make room"); - iter->second.receivedPkts.pop(); - } - - iter->second.receivedPkts.push(recPktInfo); - } else if(bytes == 20) { - c->log( - UDPC_LoggingType::VERBOSE, - "Received packet has no payload"); - } + c->update_impl(); } void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {