]> git.seodisparate.com - UDPConnection/commitdiff
Some more work on cpp_impl (still WIP)
authorStephen Seo <seo.disparate@gmail.com>
Thu, 25 Jul 2019 11:51:08 +0000 (20:51 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Thu, 25 Jul 2019 11:51:08 +0000 (20:51 +0900)
cpp_impl/src/TSQueue.hpp
cpp_impl/src/UDPC_Defines.hpp
cpp_impl/src/UDPConnection.cpp
cpp_impl/src/UDPConnection.h

index badb7d2cc1b9c3f1b5c8410501be1ee7a95d110a..7d83dee894f0708f56ba608fd183389480c8feca 100644 (file)
@@ -28,6 +28,7 @@ class TSQueue {
     void clear();
     void changeCapacity(unsigned int newCapacity);
     unsigned int size();
+    bool empty();
 
   private:
     std::atomic_bool spinLock;
@@ -100,4 +101,11 @@ unsigned int TSQueue<T>::size() {
     return size;
 }
 
+template <typename T>
+bool TSQueue<T>::empty() {
+    // No lock required, since this is calling size() that uses a lock
+    unsigned int size = this->size();
+    return size == 0;
+}
+
 #endif
index 29659f73a1ca14788aed7392c2019288696f7384..f6f8ab678769279611bed749d109de635570fcee 100644 (file)
@@ -3,25 +3,37 @@
 
 #define UDPC_CONTEXT_IDENTIFIER 0x902F4DB3
 #define UDPC_TIMEOUT_SECONDS 10.0f
+#define UDPC_GOOD_MODE_SEND_INTERVAL (1.0f / 30.0f)
+#define UDPC_BAD_MODE_SEND_INTERVAL (1.0f / 10.0f)
+#define UDPC_SENT_PKTS_MAX_SIZE 33
+
+#define UDPC_ID_CONNECT 0x80000000
+#define UDPC_ID_PING 0x40000000
+#define UDPC_ID_NO_REC_CHK 0x20000000
+#define UDPC_ID_RESENDING 0x10000000
 
 #include <atomic>
 #include <bitset>
+#include <chrono>
 #include <cstdint>
 #include <deque>
-#include <chrono>
 #include <unordered_map>
 
-#include "UDPConnection.h"
 #include "TSQueue.hpp"
+#include "UDPConnection.h"
 
 namespace UDPC {
 
+static uint32_t LOCAL_ADDR = 0;
+static const auto INIT_PKT_INTERVAL_DT = std::chrono::seconds(5);
+static const auto HEARTBEAT_PKT_INTERVAL_DT = std::chrono::milliseconds(150);
+
 struct ConnectionData {
     /*
      * 0 - trigger send
      * 1 - is good mode
      * 2 - is good rtt
-     * 3 - initiating connection to server
+     * 3 - initiating connection
      * 4 - is id set
      */
     std::bitset<32> flags;
@@ -35,9 +47,9 @@ struct ConnectionData {
     float toggledTimer;
     uint32_t addr; // in network order
     uint16_t port;
-    std::deque<PacketInfo> sentPkts;
-    TSQueue<PacketInfo> sendPkts;
-    TSQueue<PacketInfo> priorityPkts;
+    std::deque<UDPC_PacketInfo> sentPkts;
+    TSQueue<UDPC_PacketInfo> sendPkts;
+    TSQueue<UDPC_PacketInfo> priorityPkts;
     std::chrono::steady_clock::time_point received;
     std::chrono::steady_clock::time_point sent;
     float rtt;
@@ -48,7 +60,8 @@ struct Context {
 
     uint_fast32_t _contextIdentifier;
     /*
-     * 0 - isThreaded
+     * 0 - is threaded
+     * 1 - is client
      */
     std::bitset<32> flags;
     std::atomic_bool isAcceptNewConnections;
@@ -61,12 +74,24 @@ struct Context {
 
     std::chrono::steady_clock::time_point lastUpdated;
     std::unordered_map<uint32_t, ConnectionData> conMap;
+    std::unordered_map<uint32_t, uint32_t> idMap;
+
 }; // struct Context
 
-ContextverifyContext(void *ctx);
+Context *verifyContext(void *ctx);
 
 bool isBigEndian();
 
+/*
+ * flags:
+ *   - 0x1 - connect
+ *   - 0x2 - ping
+ *   - 0x4 - no_rec_chk
+ *   - 0x8 - resending
+ */
+void preparePacket(char *data, uint32_t protocolID, uint32_t conID,
+                   uint32_t rseq, uint32_t ack, uint32_t *seqID, int flags);
+
 } // namespace UDPC
 
 #endif
index a2ce588eeb207e0133da29e454bb6d14bd6552b7..1a1dec927c52e898eda6061f28ca9e061d80f71e 100644 (file)
@@ -1,7 +1,9 @@
 #include "UDPC_Defines.hpp"
 #include "UDPConnection.h"
 
+#include <cassert>
 #include <chrono>
+#include <cstring>
 #include <optional>
 #include <vector>
 
@@ -19,6 +21,14 @@ UDPC::Context::Context(bool isThreaded)
     } else {
         flags.reset(0);
     }
+
+    if(UDPC::LOCAL_ADDR == 0) {
+        if(UDPC::isBigEndian()) {
+            UDPC::LOCAL_ADDR = 0x7F000001;
+        } else {
+            UDPC::LOCAL_ADDR = 0x0100007F;
+        }
+    }
 }
 
 UDPC::Context *UDPC::verifyContext(void *ctx) {
@@ -47,6 +57,33 @@ bool UDPC::isBigEndian() {
     return *isBigEndian;
 }
 
+void UDPC::preparePacket(char *data, uint32_t protocolID, uint32_t conID,
+                         uint32_t rseq, uint32_t ack, uint32_t *seqID,
+                         int flags) {
+    uint32_t temp;
+
+    temp = htonl(protocolID);
+    std::memcpy(data, &temp, 4);
+    temp = htonl(conID | ((flags & 0x1) != 0 ? UDPC_ID_CONNECT : 0) |
+                 ((flags & 0x2) != 0 ? UDPC_ID_PING : 0) |
+                 ((flags & 0x4) != 0 ? UDPC_ID_NO_REC_CHK : 0) |
+                 ((flags & 0x8) != 0 ? UDPC_ID_RESENDING : 0));
+    std::memcpy(data + 4, &temp, 4);
+
+    if(seqID) {
+        temp = htonl(*seqID);
+        ++(*seqID);
+    } else {
+        temp = 0;
+    }
+    std::memcpy(data + 8, &temp, 4);
+
+    temp = htonl(rseq);
+    std::memcpy(data + 12, &temp, 4);
+    temp = htonl(ack);
+    std::memcpy(data + 16, &temp, 4);
+}
+
 void *UDPC_init(uint16_t listenPort, uint32_t listenAddr, int isClient) {
     UDPC::Context *ctx = new UDPC::Context(false);
 
@@ -71,6 +108,15 @@ void *UDPC_init(uint16_t listenPort, uint32_t listenAddr, int isClient) {
         return nullptr;
     }
 
+    // TODO verify this is necessary to get the listen port
+    if(ctx->socketInfo.sin_port == 0) {
+        struct sockaddr_in getInfo;
+        socklen_t size = sizeof(struct sockaddr_in);
+        if(getsockname(ctx->socketHandle, (struct sockaddr *)&getInfo, &size) == 0) {
+            ctx->socketInfo.sin_port = getInfo.sin_port;
+        }
+    }
+
     // set non-blocking on socket
 #if UDPC_PLATFORM == UDPC_PLATFORM_MAC || UDPC_PLATFORM == UDPC_PLATFORM_LINUX
     int nonblocking = 1;
@@ -123,23 +169,203 @@ void UDPC_update(void *ctx) {
 
     std::chrono::steady_clock::duration temp_dt;
     float temp_dt_fs;
-    std::vector<uint32_t> removed;
+    {
+        // check timed out, check good/bad mode with rtt, remove timed out
+        std::vector<uint32_t> removed;
+        for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) {
+            temp_dt = now - iter->second.received;
+            temp_dt_fs = (float)temp_dt.count() *
+                         (float)decltype(temp_dt)::period::num /
+                         (float)decltype(temp_dt)::period::den;
+            if(temp_dt_fs >= UDPC_TIMEOUT_SECONDS) {
+                removed.push_back(iter->first);
+                continue;
+                // TODO log timed out connection
+            }
+
+            // check good/bad mode
+            iter->second.toggleTimer += temp_dt_fs;
+            iter->second.toggledTimer += temp_dt_fs;
+            if(iter->second.flags.test(1) && !iter->second.flags.test(2)) {
+                // good mode, bad rtt
+                // TODO log switching to bad mode
+                iter->second.flags.reset(1);
+                if(iter->second.toggledTimer <= 10.0f) {
+                    iter->second.toggleT *= 2.0f;
+                }
+                iter->second.toggledTimer = 0.0f;
+            } else if(iter->second.flags.test(1)) {
+                // good mode, good rtt
+                if(iter->second.toggleTimer >= 10.0f) {
+                    iter->second.toggleTimer = 0.0f;
+                    iter->second.toggleT /= 2.0f;
+                    if(iter->second.toggleT < 1.0f) {
+                        iter->second.toggleT = 1.0f;
+                    }
+                }
+            } else if(!iter->second.flags.test(1) &&
+                      iter->second.flags.test(2)) {
+                // bad mode, good rtt
+                if(iter->second.toggledTimer >= iter->second.toggleT) {
+                    iter->second.toggleTimer = 0.0f;
+                    iter->second.toggledTimer = 0.0f;
+                    // TODO log switching to good mode
+                    iter->second.flags.set(1);
+                }
+            } else {
+                // bad mode, bad rtt
+                iter->second.toggledTimer = 0.0f;
+            }
+
+            iter->second.timer += temp_dt_fs;
+            if(iter->second.timer >= (iter->second.flags.test(1)
+                                          ? UDPC_GOOD_MODE_SEND_INTERVAL
+                                          : UDPC_BAD_MODE_SEND_INTERVAL)) {
+                iter->second.timer = 0.0f;
+                iter->second.flags.set(0);
+            }
+        }
+        for(auto iter = removed.begin(); iter != removed.end(); ++iter) {
+            auto cIter = c->conMap.find(*iter);
+            assert(cIter != c->conMap.end());
+            if(cIter->second.flags.test(4)) {
+                c->idMap.erase(cIter->second.id);
+            }
+            c->conMap.erase(cIter);
+        }
+    }
+
+    // update send (only if triggerSend flag is set)
     for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) {
-        temp_dt = now - iter->second.received;
-        temp_dt_fs = (float)temp_dt.count() *
-                     (float)decltype(temp_dt)::period::num /
-                     (float)decltype(temp_dt)::period::den;
-        if(temp_dt_fs >= UDPC_TIMEOUT_SECONDS) {
-            removed.push_back(iter->first);
-            // TODO log timed out connection
+        if(!iter->second.flags.test(0)) {
+            continue;
+        }
+        iter->second.flags.reset(0);
+
+        if(iter->second.flags.test(3)) {
+            if(c->flags.test(1)) {
+                // is initiating connection to server
+                auto initDT = now - iter->second.sent;
+                if(initDT < UDPC::INIT_PKT_INTERVAL_DT) {
+                    continue;
+                }
+                iter->second.sent = now;
+
+                std::unique_ptr<char[]> buf = std::make_unique<char[]>(20);
+                UDPC::preparePacket(
+                    buf.get(),
+                    c->protocolID,
+                    0,
+                    0,
+                    0xFFFFFFFF,
+                    nullptr,
+                    0x1);
+
+                struct sockaddr_in destinationInfo;
+                destinationInfo.sin_family = AF_INET;
+                destinationInfo.sin_addr.s_addr = iter->first;
+                destinationInfo.sin_port = htons(iter->second.port);
+                long int sentBytes = sendto(
+                    c->socketHandle,
+                    buf.get(),
+                    20,
+                    0,
+                    (struct sockaddr*) &destinationInfo,
+                    sizeof(struct sockaddr_in));
+                if(sentBytes != 20) {
+                    // TODO log fail of sending connection-initiate-packet
+                }
+            } else {
+                // is server, initiate connection to client
+                iter->second.flags.reset(3);
+                iter->second.sent = now;
+
+                std::unique_ptr<char[]> buf = std::make_unique<char[]>(20);
+                UDPC::preparePacket(
+                    buf.get(),
+                    c->protocolID,
+                    iter->second.id,
+                    iter->second.rseq,
+                    iter->second.ack,
+                    &iter->second.lseq,
+                    0x1);
+
+                struct sockaddr_in destinationInfo;
+                destinationInfo.sin_family = AF_INET;
+                destinationInfo.sin_addr.s_addr = iter->first;
+                destinationInfo.sin_port = htons(iter->second.port);
+                long int sentBytes = sendto(
+                    c->socketHandle,
+                    buf.get(),
+                    20,
+                    0,
+                    (struct sockaddr*) &destinationInfo,
+                    sizeof(struct sockaddr_in));
+                if(sentBytes != 20) {
+                    // TODO log fail send init connection packet as server
+                }
+            }
+            continue;
         }
 
-        // check good/bad mode
-        iter->second.toggleTimer += temp_dt_fs;
-        iter->second.toggledTimer += temp_dt_fs;
-        if(iter->second.flags.test(1) && !iter->second.flags.test(2)) {
-            // good mode, bad rtt
-            // TODO
+        // Not initiating connection, send as normal on current connection
+        if(iter->second.sendPkts.empty() && iter->second.priorityPkts.empty()) {
+            // nothing in queues, send heartbeat packet
+            auto sentDT = now - iter->second.sent;
+            if(sentDT < UDPC::HEARTBEAT_PKT_INTERVAL_DT) {
+                continue;
+            }
+
+            std::unique_ptr<char[]> buf = std::make_unique<char[]>(20);
+            UDPC::preparePacket(
+                buf.get(),
+                c->protocolID,
+                iter->second.id,
+                iter->second.rseq,
+                iter->second.ack,
+                &iter->second.lseq,
+                0);
+
+            struct sockaddr_in destinationInfo;
+            destinationInfo.sin_family = AF_INET;
+            destinationInfo.sin_addr.s_addr = iter->first;
+            destinationInfo.sin_port = htons(iter->second.port);
+            long int sentBytes = sendto(
+                c->socketHandle,
+                buf.get(),
+                20,
+                0,
+                (struct sockaddr*) &destinationInfo,
+                sizeof(struct sockaddr_in));
+            if(sentBytes != 20) {
+                // TODO log fail send heartbeat packet
+            }
+
+            UDPC_PacketInfo pInfo{{0}, 0, 0, 0, 0, 0, 0};
+            pInfo.sender = UDPC::LOCAL_ADDR;
+            pInfo.receiver = iter->first;
+            pInfo.senderPort = c->socketInfo.sin_port;
+            pInfo.receiverPort = iter->second.port;
+
+            iter->second.sentPkts.push_back(std::move(pInfo));
+            while(iter->second.sentPkts.size() > UDPC_SENT_PKTS_MAX_SIZE) {
+                iter->second.sentPkts.pop_front();
+            }
+        } else {
+            // sendPkts or priorityPkts not empty
+            UDPC_PacketInfo pInfo;
+            bool isResending = false;
+            if(!iter->second.priorityPkts.empty()) {
+                // TODO verify getting struct copy is valid
+                pInfo = iter->second.priorityPkts.top();
+                iter->second.priorityPkts.pop();
+                isResending = true;
+            } else {
+                pInfo = iter->second.sendPkts.top();
+                iter->second.sendPkts.pop();
+            }
+            std::unique_ptr<char[]> buf = std::make_unique<char[]>(20 + pInfo.dataSize);
+            // TODO prepare and send packet
         }
     }
 
@@ -197,13 +423,13 @@ UDPC_LoggingType set_logging_type(void *ctx, UDPC_LoggingType loggingType) {
     return static_cast<UDPC_LoggingType>(c->loggingType.exchange(loggingType));
 }
 
-PacketInfo UDPC_get_received(void *ctx) {
+UDPC_PacketInfo UDPC_get_received(void *ctx) {
     UDPC::Context *c = UDPC::verifyContext(ctx);
     if(!c) {
-        return PacketInfo{{0}, 0, 0, 0, 0, 0};
+        return UDPC_PacketInfo{{0}, 0, 0, 0, 0, 0, 0};
     }
     // TODO impl
-    return PacketInfo{{0}, 0, 0, 0, 0, 0};
+    return UDPC_PacketInfo{{0}, 0, 0, 0, 0, 0, 0};
 }
 
 const char *UDPC_atostr(void *ctx, uint32_t addr) {
index f4570e323addebab11929c0b2e1e1f0a32165828..5cd1cef8d3cf1dd4bf60dcbaa0a95d50e48b7d2c 100644 (file)
@@ -48,12 +48,19 @@ typedef enum { SILENT, ERROR, WARNING, VERBOSE, INFO } UDPC_LoggingType;
 
 typedef struct {
     char data[UDPC_PACKET_MAX_SIZE];
+    /*
+     * 0x1 - connect
+     * 0x2 - ping
+     * 0x4 - no_rec_chk
+     * 0x8 - resending
+     */
+    uint32_t flags;
     uint16_t dataSize; // zero if invalid
     uint32_t sender;
     uint32_t receiver;
     uint16_t senderPort;
     uint16_t receiverPort;
-} PacketInfo;
+} UDPC_PacketInfo;
 
 void *UDPC_init(uint16_t listenPort, uint32_t listenAddr, int isClient);
 void *UDPC_init_threaded_update(uint16_t listenPort, uint32_t listenAddr,
@@ -76,7 +83,7 @@ uint32_t UDPC_set_protocol_id(void *ctx, uint32_t id);
 
 UDPC_LoggingType set_logging_type(void *ctx, UDPC_LoggingType loggingType);
 
-PacketInfo UDPC_get_received(void *ctx);
+UDPC_PacketInfo UDPC_get_received(void *ctx);
 
 const char *UDPC_atostr(void *ctx, uint32_t addr);