]> git.seodisparate.com - UDPConnection/commitdiff
Fixes/improvements to UDPConnection, still WIP
authorStephen Seo <seo.disparate@gmail.com>
Thu, 21 Feb 2019 06:40:30 +0000 (15:40 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Thu, 21 Feb 2019 06:40:30 +0000 (15:40 +0900)
Need to implement threaded update properly.

src/UDPC_Defines.h
src/UDPConnection.c
src/UDPConnection.h

index 6bf767e06157b23ec727e29a60c0bacd9bff3325..6549a1a23ddfb252e2b16d681af9a739a8a05678 100644 (file)
@@ -46,12 +46,16 @@ static const char *UDPC_ERR_THREADFAIL_STR = "Failed to create thread";
 
 #define UDPC_SENT_PKTS_MAX_SIZE 34
 #define UDPC_SENT_PKTS_ALLOC_SIZE 35
+#define UDPC_SEND_PKTS_ALLOC_SIZE 40
 
 #define UDPC_PACKET_MAX_SIZE 8192
 
 #define UDPC_PACKET_TIMEOUT_SEC 1.0f
 #define UDPC_GOOD_RTT_LIMIT_SEC 0.25f
 
-#define UDPC_REC_PKTS_ALLOC_SIZE 64
+#define UDPC_REC_PKTS_ALLOC_SIZE 128
+
+#define UDPC_CONNECTED_EVENT_SIZE 64
+#define UDPC_DISCONNECTED_EVENT_SIZE 64
 
 #endif
index 0fa21a36690d08e38988e77381b602d8ca432278..50eaa773e77a92805117eea20308a9c7804fa259 100644 (file)
@@ -15,6 +15,10 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient)
     context->conMap = UDPC_HashMap_init(13, sizeof(UDPC_INTERNAL_ConnectionData));
     timespec_get(&context->lastUpdated, TIME_UTC);
     context->atostrBuf[UDPC_ATOSTR_BUF_SIZE - 1] = 0;
+    context->connectedEvents = UDPC_Deque_init(
+        UDPC_CONNECTED_EVENT_SIZE * 4);
+    context->disconnectedEvents = UDPC_Deque_init(
+        UDPC_DISCONNECTED_EVENT_SIZE * 4);
     context->receivedPackets = UDPC_Deque_init(
         UDPC_REC_PKTS_ALLOC_SIZE * sizeof(UDPC_INTERNAL_PacketInfo));
     context->callbackConnected = NULL;
@@ -206,9 +210,39 @@ void UDPC_set_callback_received(
     ctx->callbackReceivedUserData = userData;
 }
 
-void UDPC_check_received(UDPC_Context *ctx)
+void UDPC_check_events(UDPC_Context *ctx)
 {
     // TODO use a lock on receivedPackets
+    if(ctx->callbackConnected)
+    {
+        for(int x = 0; x * 4 < ctx->connectedEvents->size; ++x)
+        {
+            ctx->callbackConnected(ctx->callbackConnectedUserData,
+                *((uint32_t*)UDPC_Deque_index_ptr(ctx->connectedEvents, 4, x)));
+        }
+        UDPC_Deque_clear(ctx->connectedEvents);
+    }
+    else
+    {
+        UDPC_INTERNAL_log(ctx, 0, "Connected callback not set");
+        UDPC_Deque_clear(ctx->connectedEvents);
+    }
+
+    if(ctx->callbackDisconnected)
+    {
+        for(int x = 0; x * 4 < ctx->disconnectedEvents->size; ++x)
+        {
+            ctx->callbackDisconnected(ctx->callbackDisconnectedUserData,
+                *((uint32_t*)UDPC_Deque_index_ptr(ctx->disconnectedEvents, 4, x)));
+        }
+        UDPC_Deque_clear(ctx->disconnectedEvents);
+    }
+    else
+    {
+        UDPC_INTERNAL_log(ctx, 0, "Disconnected callback not set");
+        UDPC_Deque_clear(ctx->disconnectedEvents);
+    }
+
     if(ctx->callbackReceived)
     {
         for(int x = 0; x * sizeof(UDPC_INTERNAL_PacketInfo) < ctx->receivedPackets->size; ++x)
@@ -249,8 +283,16 @@ int UDPC_queue_send(UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *
         if(pinfo.data)
         {
             memcpy(pinfo.data, data, size);
-            UDPC_Deque_push_back(cd->sendPktQueue, &pinfo, sizeof(UDPC_INTERNAL_PacketInfo));
-            return 1;
+            if(UDPC_Deque_push_back(cd->sendPktQueue, &pinfo, sizeof(UDPC_INTERNAL_PacketInfo)) == 0)
+            {
+                UDPC_INTERNAL_log(ctx, 1, "Not enough free space in send "
+                    "packet queue, failed to queue packet for sending");
+                return 0;
+            }
+            else
+            {
+                return 1;
+            }
         }
         else
         {
@@ -266,6 +308,14 @@ int UDPC_queue_send(UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *
     }
 }
 
+int UDPC_get_queue_send_available(UDPC_Context *ctx, uint32_t addr)
+{
+    UDPC_INTERNAL_ConnectionData *cd = UDPC_HashMap_get(ctx->conMap, addr);
+    if(!cd) { return 0; }
+
+    return UDPC_Deque_get_available(cd->sendPktQueue) / sizeof(UDPC_INTERNAL_PacketInfo);
+}
+
 uint32_t UDPC_get_error(UDPC_Context *ctx)
 {
     uint32_t error = ctx->error;
@@ -401,7 +451,7 @@ void UDPC_update(UDPC_Context *ctx)
                 receivedData.sin_addr.s_addr,
                 ntohs(receivedData.sin_port),
                 UDPC_Deque_init(sizeof(UDPC_INTERNAL_PacketInfo) * UDPC_SENT_PKTS_ALLOC_SIZE),
-                UDPC_Deque_init(sizeof(UDPC_INTERNAL_PacketInfo) * UDPC_SENT_PKTS_ALLOC_SIZE),
+                UDPC_Deque_init(sizeof(UDPC_INTERNAL_PacketInfo) * UDPC_SEND_PKTS_ALLOC_SIZE),
                 {0, 0},
                 {0, 0},
                 0.0f
@@ -409,9 +459,16 @@ void UDPC_update(UDPC_Context *ctx)
             newCD.received = us.tsNow;
             newCD.sent = us.tsNow;
             UDPC_HashMap_insert(ctx->conMap, newCD.addr, &newCD);
-            if(ctx->callbackConnected)
+            if(UDPC_Deque_get_available(ctx->connectedEvents) == 0)
             {
-                ctx->callbackConnected(ctx->callbackConnectedUserData, receivedData.sin_addr.s_addr);
+                UDPC_Deque_pop_front(ctx->connectedEvents, 4);
+                UDPC_Deque_push_back(ctx->connectedEvents, &receivedData.sin_addr.s_addr, 4);
+                UDPC_INTERNAL_log(ctx, 1, "Not enough free space in connected "
+                    "events queue, removing oldest to make room");
+            }
+            else
+            {
+                UDPC_Deque_push_back(ctx->connectedEvents, &receivedData.sin_addr.s_addr, 4);
             }
         }
         return;
@@ -522,7 +579,19 @@ void UDPC_update(UDPC_Context *ctx)
         receivedInfo.size = bytes - 20;
         receivedInfo.sent = us.tsNow;
 
-        UDPC_Deque_push_back(ctx->receivedPackets, &receivedInfo, sizeof(UDPC_INTERNAL_PacketInfo));
+        if(UDPC_Deque_get_available(ctx->receivedPackets) == 0)
+        {
+            UDPC_INTERNAL_PacketInfo *rpinfo = UDPC_Deque_get_front_ptr(
+                ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo));
+            if(rpinfo->data) { free(rpinfo->data); }
+            UDPC_Deque_pop_front(ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo));
+            UDPC_Deque_push_back(ctx->receivedPackets, &receivedInfo, sizeof(UDPC_INTERNAL_PacketInfo));
+            UDPC_INTERNAL_log(ctx, 1, "Received packet but not enough space in received queue, removing oldest packet to make room");
+        }
+        else
+        {
+            UDPC_Deque_push_back(ctx->receivedPackets, &receivedInfo, sizeof(UDPC_INTERNAL_PacketInfo));
+        }
     }
     else
     {
@@ -543,9 +612,16 @@ void UDPC_INTERNAL_update_to_rtt_si(void *userData, uint32_t addr, char *data)
         UDPC_INTERNAL_log(us->ctx, 2, "Connection timed out with addr %s port %d",
             UDPC_INTERNAL_atostr(us->ctx, addr),
             cd->port);
-        if(us->ctx->callbackDisconnected)
+        if(UDPC_Deque_get_available(us->ctx->disconnectedEvents) == 0)
+        {
+            UDPC_Deque_pop_front(us->ctx->disconnectedEvents, 4);
+            UDPC_Deque_push_back(us->ctx->disconnectedEvents, &addr, 4);
+            UDPC_INTERNAL_log(us->ctx, 1, "Not enough free space in "
+                "disconnected events queue, removing oldest event to make room");
+        }
+        else
         {
-            us->ctx->callbackDisconnected(us->ctx->callbackDisconnectedUserData, addr);
+            UDPC_Deque_push_back(us->ctx->disconnectedEvents, &addr, 4);
         }
         return;
     }
@@ -847,6 +923,7 @@ void UDPC_INTERNAL_check_pkt_timeout(
                     pinfo->flags |= 0x4;
                     pinfo->data = NULL;
                     pinfo->size = 0;
+                    // TODO use separate queue for resending packets
                     UDPC_Deque_push_back(
                         cd->sendPktQueue,
                         &newPkt,
index d1290b29fabffb5bd1a3289ce9db6b4d19fc0eb7..d470bcfb3bdf3834bcac6df0183da9c9d36b33e8 100644 (file)
@@ -115,6 +115,8 @@ typedef struct {
     struct timespec lastUpdated;
     char atostrBuf[UDPC_ATOSTR_BUF_SIZE];
     char recvBuf[UDPC_PACKET_MAX_SIZE];
+    UDPC_Deque *connectedEvents;
+    UDPC_Deque *disconnectedEvents;
     UDPC_Deque *receivedPackets;
 
     UDPC_callback_connected callbackConnected;
@@ -149,7 +151,7 @@ void UDPC_set_callback_disconnected(
 void UDPC_set_callback_received(
     UDPC_Context *ctx, UDPC_callback_received fptr, void *userData);
 
-void UDPC_check_received(UDPC_Context *ctx);
+void UDPC_check_events(UDPC_Context *ctx);
 
 /*!
  * \brief Queues a packet to send to a connected peer
@@ -161,6 +163,12 @@ void UDPC_check_received(UDPC_Context *ctx);
 int UDPC_queue_send(
     UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *data, uint32_t size);
 
+/*!
+ * \brief get the number of packets that can be queued to the addr
+ * \return number of queueable packets or 0 if connection has not been established
+ */
+int UDPC_get_queue_send_available(UDPC_Context *ctx, uint32_t addr);
+
 uint32_t UDPC_get_error(UDPC_Context *ctx);
 
 const char* UDPC_get_error_str(uint32_t error);