diff --git a/cpp_impl/src/TSQueue.hpp b/cpp_impl/src/TSQueue.hpp index badb7d2..7d83dee 100644 --- a/cpp_impl/src/TSQueue.hpp +++ b/cpp_impl/src/TSQueue.hpp @@ -28,6 +28,7 @@ class TSQueue { void clear(); void changeCapacity(unsigned int newCapacity); unsigned int size(); + bool empty(); private: std::atomic_bool spinLock; @@ -100,4 +101,11 @@ unsigned int TSQueue::size() { return size; } +template +bool TSQueue::empty() { + // No lock required, since this is calling size() that uses a lock + unsigned int size = this->size(); + return size == 0; +} + #endif diff --git a/cpp_impl/src/UDPC_Defines.hpp b/cpp_impl/src/UDPC_Defines.hpp index 29659f7..f6f8ab6 100644 --- a/cpp_impl/src/UDPC_Defines.hpp +++ b/cpp_impl/src/UDPC_Defines.hpp @@ -3,25 +3,37 @@ #define UDPC_CONTEXT_IDENTIFIER 0x902F4DB3 #define UDPC_TIMEOUT_SECONDS 10.0f +#define UDPC_GOOD_MODE_SEND_INTERVAL (1.0f / 30.0f) +#define UDPC_BAD_MODE_SEND_INTERVAL (1.0f / 10.0f) +#define UDPC_SENT_PKTS_MAX_SIZE 33 + +#define UDPC_ID_CONNECT 0x80000000 +#define UDPC_ID_PING 0x40000000 +#define UDPC_ID_NO_REC_CHK 0x20000000 +#define UDPC_ID_RESENDING 0x10000000 #include #include +#include #include #include -#include #include -#include "UDPConnection.h" #include "TSQueue.hpp" +#include "UDPConnection.h" namespace UDPC { +static uint32_t LOCAL_ADDR = 0; +static const auto INIT_PKT_INTERVAL_DT = std::chrono::seconds(5); +static const auto HEARTBEAT_PKT_INTERVAL_DT = std::chrono::milliseconds(150); + struct ConnectionData { /* * 0 - trigger send * 1 - is good mode * 2 - is good rtt - * 3 - initiating connection to server + * 3 - initiating connection * 4 - is id set */ std::bitset<32> flags; @@ -35,9 +47,9 @@ struct ConnectionData { float toggledTimer; uint32_t addr; // in network order uint16_t port; - std::deque sentPkts; - TSQueue sendPkts; - TSQueue priorityPkts; + std::deque sentPkts; + TSQueue sendPkts; + TSQueue priorityPkts; std::chrono::steady_clock::time_point received; std::chrono::steady_clock::time_point sent; float rtt; @@ -48,7 +60,8 @@ struct Context { uint_fast32_t _contextIdentifier; /* - * 0 - isThreaded + * 0 - is threaded + * 1 - is client */ std::bitset<32> flags; std::atomic_bool isAcceptNewConnections; @@ -61,12 +74,24 @@ struct Context { std::chrono::steady_clock::time_point lastUpdated; std::unordered_map conMap; + std::unordered_map idMap; + }; // struct Context -Context* verifyContext(void *ctx); +Context *verifyContext(void *ctx); bool isBigEndian(); +/* + * flags: + * - 0x1 - connect + * - 0x2 - ping + * - 0x4 - no_rec_chk + * - 0x8 - resending + */ +void preparePacket(char *data, uint32_t protocolID, uint32_t conID, + uint32_t rseq, uint32_t ack, uint32_t *seqID, int flags); + } // namespace UDPC #endif diff --git a/cpp_impl/src/UDPConnection.cpp b/cpp_impl/src/UDPConnection.cpp index a2ce588..1a1dec9 100644 --- a/cpp_impl/src/UDPConnection.cpp +++ b/cpp_impl/src/UDPConnection.cpp @@ -1,7 +1,9 @@ #include "UDPC_Defines.hpp" #include "UDPConnection.h" +#include #include +#include #include #include @@ -19,6 +21,14 @@ UDPC::Context::Context(bool isThreaded) } else { flags.reset(0); } + + if(UDPC::LOCAL_ADDR == 0) { + if(UDPC::isBigEndian()) { + UDPC::LOCAL_ADDR = 0x7F000001; + } else { + UDPC::LOCAL_ADDR = 0x0100007F; + } + } } UDPC::Context *UDPC::verifyContext(void *ctx) { @@ -47,6 +57,33 @@ bool UDPC::isBigEndian() { return *isBigEndian; } +void UDPC::preparePacket(char *data, uint32_t protocolID, uint32_t conID, + uint32_t rseq, uint32_t ack, uint32_t *seqID, + int flags) { + uint32_t temp; + + temp = htonl(protocolID); + std::memcpy(data, &temp, 4); + temp = htonl(conID | ((flags & 0x1) != 0 ? UDPC_ID_CONNECT : 0) | + ((flags & 0x2) != 0 ? UDPC_ID_PING : 0) | + ((flags & 0x4) != 0 ? UDPC_ID_NO_REC_CHK : 0) | + ((flags & 0x8) != 0 ? UDPC_ID_RESENDING : 0)); + std::memcpy(data + 4, &temp, 4); + + if(seqID) { + temp = htonl(*seqID); + ++(*seqID); + } else { + temp = 0; + } + std::memcpy(data + 8, &temp, 4); + + temp = htonl(rseq); + std::memcpy(data + 12, &temp, 4); + temp = htonl(ack); + std::memcpy(data + 16, &temp, 4); +} + void *UDPC_init(uint16_t listenPort, uint32_t listenAddr, int isClient) { UDPC::Context *ctx = new UDPC::Context(false); @@ -71,6 +108,15 @@ void *UDPC_init(uint16_t listenPort, uint32_t listenAddr, int isClient) { return nullptr; } + // TODO verify this is necessary to get the listen port + if(ctx->socketInfo.sin_port == 0) { + struct sockaddr_in getInfo; + socklen_t size = sizeof(struct sockaddr_in); + if(getsockname(ctx->socketHandle, (struct sockaddr *)&getInfo, &size) == 0) { + ctx->socketInfo.sin_port = getInfo.sin_port; + } + } + // set non-blocking on socket #if UDPC_PLATFORM == UDPC_PLATFORM_MAC || UDPC_PLATFORM == UDPC_PLATFORM_LINUX int nonblocking = 1; @@ -123,23 +169,203 @@ void UDPC_update(void *ctx) { std::chrono::steady_clock::duration temp_dt; float temp_dt_fs; - std::vector removed; + { + // check timed out, check good/bad mode with rtt, remove timed out + std::vector removed; + for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) { + temp_dt = now - iter->second.received; + temp_dt_fs = (float)temp_dt.count() * + (float)decltype(temp_dt)::period::num / + (float)decltype(temp_dt)::period::den; + if(temp_dt_fs >= UDPC_TIMEOUT_SECONDS) { + removed.push_back(iter->first); + continue; + // TODO log timed out connection + } + + // check good/bad mode + iter->second.toggleTimer += temp_dt_fs; + iter->second.toggledTimer += temp_dt_fs; + if(iter->second.flags.test(1) && !iter->second.flags.test(2)) { + // good mode, bad rtt + // TODO log switching to bad mode + iter->second.flags.reset(1); + if(iter->second.toggledTimer <= 10.0f) { + iter->second.toggleT *= 2.0f; + } + iter->second.toggledTimer = 0.0f; + } else if(iter->second.flags.test(1)) { + // good mode, good rtt + if(iter->second.toggleTimer >= 10.0f) { + iter->second.toggleTimer = 0.0f; + iter->second.toggleT /= 2.0f; + if(iter->second.toggleT < 1.0f) { + iter->second.toggleT = 1.0f; + } + } + } else if(!iter->second.flags.test(1) && + iter->second.flags.test(2)) { + // bad mode, good rtt + if(iter->second.toggledTimer >= iter->second.toggleT) { + iter->second.toggleTimer = 0.0f; + iter->second.toggledTimer = 0.0f; + // TODO log switching to good mode + iter->second.flags.set(1); + } + } else { + // bad mode, bad rtt + iter->second.toggledTimer = 0.0f; + } + + iter->second.timer += temp_dt_fs; + if(iter->second.timer >= (iter->second.flags.test(1) + ? UDPC_GOOD_MODE_SEND_INTERVAL + : UDPC_BAD_MODE_SEND_INTERVAL)) { + iter->second.timer = 0.0f; + iter->second.flags.set(0); + } + } + for(auto iter = removed.begin(); iter != removed.end(); ++iter) { + auto cIter = c->conMap.find(*iter); + assert(cIter != c->conMap.end()); + if(cIter->second.flags.test(4)) { + c->idMap.erase(cIter->second.id); + } + c->conMap.erase(cIter); + } + } + + // update send (only if triggerSend flag is set) for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) { - temp_dt = now - iter->second.received; - temp_dt_fs = (float)temp_dt.count() * - (float)decltype(temp_dt)::period::num / - (float)decltype(temp_dt)::period::den; - if(temp_dt_fs >= UDPC_TIMEOUT_SECONDS) { - removed.push_back(iter->first); - // TODO log timed out connection + if(!iter->second.flags.test(0)) { + continue; + } + iter->second.flags.reset(0); + + if(iter->second.flags.test(3)) { + if(c->flags.test(1)) { + // is initiating connection to server + auto initDT = now - iter->second.sent; + if(initDT < UDPC::INIT_PKT_INTERVAL_DT) { + continue; + } + iter->second.sent = now; + + std::unique_ptr buf = std::make_unique(20); + UDPC::preparePacket( + buf.get(), + c->protocolID, + 0, + 0, + 0xFFFFFFFF, + nullptr, + 0x1); + + struct sockaddr_in destinationInfo; + destinationInfo.sin_family = AF_INET; + destinationInfo.sin_addr.s_addr = iter->first; + destinationInfo.sin_port = htons(iter->second.port); + long int sentBytes = sendto( + c->socketHandle, + buf.get(), + 20, + 0, + (struct sockaddr*) &destinationInfo, + sizeof(struct sockaddr_in)); + if(sentBytes != 20) { + // TODO log fail of sending connection-initiate-packet + } + } else { + // is server, initiate connection to client + iter->second.flags.reset(3); + iter->second.sent = now; + + std::unique_ptr buf = std::make_unique(20); + UDPC::preparePacket( + buf.get(), + c->protocolID, + iter->second.id, + iter->second.rseq, + iter->second.ack, + &iter->second.lseq, + 0x1); + + struct sockaddr_in destinationInfo; + destinationInfo.sin_family = AF_INET; + destinationInfo.sin_addr.s_addr = iter->first; + destinationInfo.sin_port = htons(iter->second.port); + long int sentBytes = sendto( + c->socketHandle, + buf.get(), + 20, + 0, + (struct sockaddr*) &destinationInfo, + sizeof(struct sockaddr_in)); + if(sentBytes != 20) { + // TODO log fail send init connection packet as server + } + } + continue; } - // check good/bad mode - iter->second.toggleTimer += temp_dt_fs; - iter->second.toggledTimer += temp_dt_fs; - if(iter->second.flags.test(1) && !iter->second.flags.test(2)) { - // good mode, bad rtt - // TODO + // Not initiating connection, send as normal on current connection + if(iter->second.sendPkts.empty() && iter->second.priorityPkts.empty()) { + // nothing in queues, send heartbeat packet + auto sentDT = now - iter->second.sent; + if(sentDT < UDPC::HEARTBEAT_PKT_INTERVAL_DT) { + continue; + } + + std::unique_ptr buf = std::make_unique(20); + UDPC::preparePacket( + buf.get(), + c->protocolID, + iter->second.id, + iter->second.rseq, + iter->second.ack, + &iter->second.lseq, + 0); + + struct sockaddr_in destinationInfo; + destinationInfo.sin_family = AF_INET; + destinationInfo.sin_addr.s_addr = iter->first; + destinationInfo.sin_port = htons(iter->second.port); + long int sentBytes = sendto( + c->socketHandle, + buf.get(), + 20, + 0, + (struct sockaddr*) &destinationInfo, + sizeof(struct sockaddr_in)); + if(sentBytes != 20) { + // TODO log fail send heartbeat packet + } + + UDPC_PacketInfo pInfo{{0}, 0, 0, 0, 0, 0, 0}; + pInfo.sender = UDPC::LOCAL_ADDR; + pInfo.receiver = iter->first; + pInfo.senderPort = c->socketInfo.sin_port; + pInfo.receiverPort = iter->second.port; + + iter->second.sentPkts.push_back(std::move(pInfo)); + while(iter->second.sentPkts.size() > UDPC_SENT_PKTS_MAX_SIZE) { + iter->second.sentPkts.pop_front(); + } + } else { + // sendPkts or priorityPkts not empty + UDPC_PacketInfo pInfo; + bool isResending = false; + if(!iter->second.priorityPkts.empty()) { + // TODO verify getting struct copy is valid + pInfo = iter->second.priorityPkts.top(); + iter->second.priorityPkts.pop(); + isResending = true; + } else { + pInfo = iter->second.sendPkts.top(); + iter->second.sendPkts.pop(); + } + std::unique_ptr buf = std::make_unique(20 + pInfo.dataSize); + // TODO prepare and send packet } } @@ -197,13 +423,13 @@ UDPC_LoggingType set_logging_type(void *ctx, UDPC_LoggingType loggingType) { return static_cast(c->loggingType.exchange(loggingType)); } -PacketInfo UDPC_get_received(void *ctx) { +UDPC_PacketInfo UDPC_get_received(void *ctx) { UDPC::Context *c = UDPC::verifyContext(ctx); if(!c) { - return PacketInfo{{0}, 0, 0, 0, 0, 0}; + return UDPC_PacketInfo{{0}, 0, 0, 0, 0, 0, 0}; } // TODO impl - return PacketInfo{{0}, 0, 0, 0, 0, 0}; + return UDPC_PacketInfo{{0}, 0, 0, 0, 0, 0, 0}; } const char *UDPC_atostr(void *ctx, uint32_t addr) { diff --git a/cpp_impl/src/UDPConnection.h b/cpp_impl/src/UDPConnection.h index f4570e3..5cd1cef 100644 --- a/cpp_impl/src/UDPConnection.h +++ b/cpp_impl/src/UDPConnection.h @@ -48,12 +48,19 @@ typedef enum { SILENT, ERROR, WARNING, VERBOSE, INFO } UDPC_LoggingType; typedef struct { char data[UDPC_PACKET_MAX_SIZE]; + /* + * 0x1 - connect + * 0x2 - ping + * 0x4 - no_rec_chk + * 0x8 - resending + */ + uint32_t flags; uint16_t dataSize; // zero if invalid uint32_t sender; uint32_t receiver; uint16_t senderPort; uint16_t receiverPort; -} PacketInfo; +} UDPC_PacketInfo; void *UDPC_init(uint16_t listenPort, uint32_t listenAddr, int isClient); void *UDPC_init_threaded_update(uint16_t listenPort, uint32_t listenAddr, @@ -76,7 +83,7 @@ uint32_t UDPC_set_protocol_id(void *ctx, uint32_t id); UDPC_LoggingType set_logging_type(void *ctx, UDPC_LoggingType loggingType); -PacketInfo UDPC_get_received(void *ctx); +UDPC_PacketInfo UDPC_get_received(void *ctx); const char *UDPC_atostr(void *ctx, uint32_t addr);