]> git.seodisparate.com - UDPConnection/commitdiff
Impl add to send-queue and recv callbacks, WIP
authorStephen Seo <seo.disparate@gmail.com>
Wed, 20 Feb 2019 06:13:57 +0000 (15:13 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Wed, 20 Feb 2019 06:13:57 +0000 (15:13 +0900)
Still not finished, need to set up the thread to send queued packets and
receive packets safely.

Also need to add to Deque (see TODO comment).

src/UDPC_Defines.h
src/UDPC_Deque.h
src/UDPConnection.c
src/UDPConnection.h

index 08de5d2b2669bd0af2126bf9b361e8daef2a9fcd..6bf767e06157b23ec727e29a60c0bacd9bff3325 100644 (file)
@@ -52,4 +52,6 @@ static const char *UDPC_ERR_THREADFAIL_STR = "Failed to create thread";
 #define UDPC_PACKET_TIMEOUT_SEC 1.0f
 #define UDPC_GOOD_RTT_LIMIT_SEC 0.25f
 
+#define UDPC_REC_PKTS_ALLOC_SIZE 64
+
 #endif
index c3fa935fecb4d0f8e3ef42e94be00a8f026ba0ec..9434cfff69d4e6e6fd4a43a17cea83c2b83c1f04 100644 (file)
@@ -40,6 +40,8 @@ int UDPC_Deque_push_back(UDPC_Deque *deque, const void *data, uint32_t size);
  */
 int UDPC_Deque_push_front(UDPC_Deque *deque, const void *data, uint32_t size);
 
+// TODO add push_back/push_front variants that realloc on not enough free space
+
 /*!
  * \return size in bytes of available data
  */
index 2e171139ebb20e0f129be3ee29dbb0965601aae6..0fa21a36690d08e38988e77381b602d8ca432278 100644 (file)
@@ -9,10 +9,23 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient)
 {
     UDPC_Context *context = malloc(sizeof(UDPC_Context));
     context->error = UDPC_SUCCESS;
-    context->flags = 0;
+    context->flags = 0xC;
+    if(isClient != 0) context->flags |= 0x2;
     context->threadFlags = 0;
+    context->conMap = UDPC_HashMap_init(13, sizeof(UDPC_INTERNAL_ConnectionData));
+    timespec_get(&context->lastUpdated, TIME_UTC);
     context->atostrBuf[UDPC_ATOSTR_BUF_SIZE - 1] = 0;
-    if(isClient != 0) context->flags |= 0x2;
+    context->receivedPackets = UDPC_Deque_init(
+        UDPC_REC_PKTS_ALLOC_SIZE * sizeof(UDPC_INTERNAL_PacketInfo));
+    context->callbackConnected = NULL;
+    context->callbackConnectedUserData = NULL;
+    context->callbackDisconnected = NULL;
+    context->callbackDisconnectedUserData = NULL;
+    context->callbackReceived = NULL;
+    context->callbackReceivedUserData = NULL;
+
+    // seed rand
+    srand(context->lastUpdated.tv_sec);
 
     // create socket
     context->socketHandle = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
@@ -20,7 +33,7 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient)
     {
         context->socketHandle = 0;
         context->error = UDPC_ERR_SOCKETFAIL;
-        fprintf(stderr, "Failed to create socket\n");
+        UDPC_INTERNAL_log(context, 0, "Failed to create socket");
         return context;
     }
 
@@ -37,7 +50,7 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient)
         context->error = UDPC_ERR_SOCKETBINDF;
         CleanupSocket(context->socketHandle);
         context->socketHandle = 0;
-        fprintf(stderr, "Failed to bind socket\n");
+        UDPC_INTERNAL_log(context, 0, "Failed to bind socket");
         return context;
     }
 
@@ -56,19 +69,13 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient)
         context->error = UDPC_ERR_SOCKETNONBF;
         CleanupSocket(context->socketHandle);
         context->socketHandle = 0;
-        fprintf(stderr, "Failed to set non-blocking on socket\n");
+        UDPC_INTERNAL_log(context, 0, "Failed to set non-blocking on socket");
 #if UDPC_PLATFORM == UDPC_PLATFORM_UNKNOWN
-        fprintf(stderr, "(Unknown platform)\n");
+        UDPC_INTERNAL_log(context, 0, "(Unknown platform)");
 #endif
         return context;
     }
 
-    context->conMap = UDPC_HashMap_init(13, sizeof(UDPC_INTERNAL_ConnectionData));
-
-    timespec_get(&context->lastUpdated, TIME_UTC);
-
-    context->flags |= (0x8 | 0x4);
-
     return context;
 }
 
@@ -81,8 +88,9 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient)
     {
         CleanupSocket(context->socketHandle);
         context->socketHandle = 0;
-        fprintf(stderr, "Failed to create mutex\n");
+        UDPC_INTERNAL_log(context, 0, "Failed to create mutex");
         context->error = UDPC_ERR_MTXFAIL;
+        return context;
     }
     context->error = UDPC_SUCCESS;
 
@@ -92,7 +100,7 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient)
         CleanupSocket(context->socketHandle);
         context->socketHandle = 0;
         mtx_destroy(&context->tCVMtx);
-        fprintf(stderr, "Failed to create mutex\n");
+        UDPC_INTERNAL_log(context, 0, "Failed to create mutex");
         context->error = UDPC_ERR_MTXFAIL;
         return context;
     }
@@ -105,7 +113,7 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient)
         context->socketHandle = 0;
         mtx_destroy(&context->tCVMtx);
         mtx_destroy(&context->tflagsMtx);
-        fprintf(stderr, "Failed to create condition variable\n");
+        UDPC_INTERNAL_log(context, 0, "Failed to create condition variable");
         context->error = UDPC_ERR_CVFAIL;
         return context;
     }
@@ -120,7 +128,7 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient)
         mtx_destroy(&context->tCVMtx);
         mtx_destroy(&context->tflagsMtx);
         cnd_destroy(&context->threadCV);
-        fprintf(stderr, "Failed to create thread\n");
+        UDPC_INTERNAL_log(context, 0, "Failed to create thread");
         context->error = UDPC_ERR_THREADFAIL;
         return context;
     }
@@ -177,6 +185,87 @@ void UDPC_INTERNAL_destroy_conMap(void *unused, uint32_t addr, char *data)
     UDPC_Deque_destroy(cd->sendPktQueue);
 }
 
+void UDPC_set_callback_connected(
+    UDPC_Context *ctx, UDPC_callback_connected fptr, void *userData)
+{
+    ctx->callbackConnected = fptr;
+    ctx->callbackConnectedUserData = userData;
+}
+
+void UDPC_set_callback_disconnected(
+    UDPC_Context *ctx, UDPC_callback_disconnected fptr, void *userData)
+{
+    ctx->callbackDisconnected = fptr;
+    ctx->callbackDisconnectedUserData = userData;
+}
+
+void UDPC_set_callback_received(
+    UDPC_Context *ctx, UDPC_callback_received fptr, void *userData)
+{
+    ctx->callbackReceived = fptr;
+    ctx->callbackReceivedUserData = userData;
+}
+
+void UDPC_check_received(UDPC_Context *ctx)
+{
+    // TODO use a lock on receivedPackets
+    if(ctx->callbackReceived)
+    {
+        for(int x = 0; x * sizeof(UDPC_INTERNAL_PacketInfo) < ctx->receivedPackets->size; ++x)
+        {
+            UDPC_INTERNAL_PacketInfo *pinfo = UDPC_Deque_index_ptr(
+                ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo), x);
+            ctx->callbackReceived(ctx->callbackReceivedUserData, pinfo->data, pinfo->size);
+            free(pinfo->data);
+        }
+        UDPC_Deque_clear(ctx->receivedPackets);
+    }
+    else
+    {
+        UDPC_INTERNAL_log(ctx, 0, "Received callback not set");
+        for(int x = 0; x * sizeof(UDPC_INTERNAL_PacketInfo) < ctx->receivedPackets->size; ++x)
+        {
+            UDPC_INTERNAL_PacketInfo *pinfo = UDPC_Deque_index_ptr(
+                ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo), x);
+            free(pinfo->data);
+        }
+        UDPC_Deque_clear(ctx->receivedPackets);
+    }
+}
+
+int UDPC_queue_send(UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *data, uint32_t size)
+{
+    UDPC_INTERNAL_ConnectionData *cd = UDPC_HashMap_get(ctx->conMap, addr);
+    if(cd)
+    {
+        UDPC_INTERNAL_PacketInfo pinfo = {
+            addr,
+            0,
+            isChecked != 0 ? 0x2 : 0,
+            malloc(size),
+            size,
+            {0, 0}
+        };
+        if(pinfo.data)
+        {
+            memcpy(pinfo.data, data, size);
+            UDPC_Deque_push_back(cd->sendPktQueue, &pinfo, sizeof(UDPC_INTERNAL_PacketInfo));
+            return 1;
+        }
+        else
+        {
+            UDPC_INTERNAL_log(ctx, 0, "Failed to allocate memory to new send-packet queue entry");
+            return 0;
+        }
+    }
+    else
+    {
+        UDPC_INTERNAL_log(ctx, 0, "Cannot send to %s when connection has not been esablished",
+            UDPC_INTERNAL_atostr(ctx, addr));
+        return 0;
+    }
+}
+
 uint32_t UDPC_get_error(UDPC_Context *ctx)
 {
     uint32_t error = ctx->error;
@@ -233,7 +322,7 @@ void UDPC_update(UDPC_Context *ctx)
         ctx
     };
     timespec_get(&us.tsNow, TIME_UTC);
-    us.dt = UDPC_ts_diff_to_seconds(&us.tsNow, &ctx->lastUpdated);
+    us.dt = UDPC_INTERNAL_ts_diff(&us.tsNow, &ctx->lastUpdated);
     ctx->lastUpdated = us.tsNow;
     us.removedQueue = UDPC_Deque_init(4 * (ctx->conMap->size));
 
@@ -301,7 +390,7 @@ void UDPC_update(UDPC_Context *ctx)
                 ntohs(receivedData.sin_port));
             UDPC_INTERNAL_ConnectionData newCD = {
                 1,
-                0,
+                (ctx->flags & 0x2) != 0 ? conID : UDPC_INTERNAL_generate_id(ctx),
                 0,
                 0,
                 0xFFFFFFFF,
@@ -317,9 +406,13 @@ void UDPC_update(UDPC_Context *ctx)
                 {0, 0},
                 0.0f
             };
-            timespec_get(&newCD.received, TIME_UTC);
-            timespec_get(&newCD.sent, TIME_UTC);
+            newCD.received = us.tsNow;
+            newCD.sent = us.tsNow;
             UDPC_HashMap_insert(ctx->conMap, newCD.addr, &newCD);
+            if(ctx->callbackConnected)
+            {
+                ctx->callbackConnected(ctx->callbackConnectedUserData, receivedData.sin_addr.s_addr);
+            }
         }
         return;
     }
@@ -418,7 +511,23 @@ void UDPC_update(UDPC_Context *ctx)
             UDPC_INTERNAL_atostr(ctx, cd->addr));
     }
 
-    // TODO received packet callback here
+    if(bytes > 20)
+    {
+        UDPC_INTERNAL_PacketInfo receivedInfo;
+        receivedInfo.addr = receivedData.sin_addr.s_addr;
+        receivedInfo.id = conID;
+        receivedInfo.flags = (isNotRecvCheck != 0 ? 0 : 0x2) | (isResent != 0 ? 0x5 : 0);
+        receivedInfo.data = malloc(bytes - 20);
+        memcpy(receivedInfo.data, ctx->recvBuf + 20, bytes - 20);
+        receivedInfo.size = bytes - 20;
+        receivedInfo.sent = us.tsNow;
+
+        UDPC_Deque_push_back(ctx->receivedPackets, &receivedInfo, sizeof(UDPC_INTERNAL_PacketInfo));
+    }
+    else
+    {
+        UDPC_INTERNAL_log(ctx, 3, "Packet has no payload, not adding to received queue");
+    }
 }
 
 void UDPC_INTERNAL_update_to_rtt_si(void *userData, uint32_t addr, char *data)
@@ -428,12 +537,16 @@ void UDPC_INTERNAL_update_to_rtt_si(void *userData, uint32_t addr, char *data)
     UDPC_INTERNAL_ConnectionData *cd = (UDPC_INTERNAL_ConnectionData*)data;
 
     // check for timed out connection
-    if(UDPC_ts_diff_to_seconds(&us->tsNow, &cd->received) >= UDPC_TIMEOUT_SECONDS)
+    if(UDPC_INTERNAL_ts_diff(&us->tsNow, &cd->received) >= UDPC_TIMEOUT_SECONDS)
     {
         UDPC_Deque_push_back(us->removedQueue, &addr, 4);
         UDPC_INTERNAL_log(us->ctx, 2, "Connection timed out with addr %s port %d",
             UDPC_INTERNAL_atostr(us->ctx, addr),
             cd->port);
+        if(us->ctx->callbackDisconnected)
+        {
+            us->ctx->callbackDisconnected(us->ctx->callbackDisconnectedUserData, addr);
+        }
         return;
     }
 
@@ -513,7 +626,7 @@ void UDPC_INTERNAL_update_send(void *userData, uint32_t addr, char *data)
     if(cd->sendPktQueue->size == 0)
     {
         // send packet queue is empty, send heartbeat packet
-        if(UDPC_ts_diff_to_seconds(&us->tsNow, &cd->sent) < UDPC_HEARTBEAT_PKT_INTERVAL)
+        if(UDPC_INTERNAL_ts_diff(&us->tsNow, &cd->sent) < UDPC_HEARTBEAT_PKT_INTERVAL)
         {
             return;
         }
@@ -653,7 +766,7 @@ void UDPC_INTERNAL_update_rtt(
         UDPC_INTERNAL_PacketInfo *pinfo = UDPC_Deque_index_ptr(cd->sentPkts, sizeof(UDPC_INTERNAL_PacketInfo), x);
         if(pinfo->id == rseq)
         {
-            float diff = UDPC_ts_diff_to_seconds(tsNow, &pinfo->sent);
+            float diff = UDPC_INTERNAL_ts_diff(tsNow, &pinfo->sent);
             if(diff > cd->rtt)
             {
                 cd->rtt += (diff - cd->rtt) / 10.0f;
@@ -705,7 +818,7 @@ void UDPC_INTERNAL_check_pkt_timeout(
                     // is not received checked or already resent
                     break;
                 }
-                float seconds = UDPC_ts_diff_to_seconds(tsNow, &pinfo->sent);
+                float seconds = UDPC_INTERNAL_ts_diff(tsNow, &pinfo->sent);
                 if(seconds >= UDPC_PACKET_TIMEOUT_SEC)
                 {
                     if(pinfo->size <= 20)
@@ -747,7 +860,7 @@ void UDPC_INTERNAL_check_pkt_timeout(
     }
 }
 
-float UDPC_ts_diff_to_seconds(struct timespec *ts0, struct timespec *ts1)
+float UDPC_INTERNAL_ts_diff(struct timespec *ts0, struct timespec *ts1)
 {
     float sec = 0.0f;
     if(!ts0 || !ts1)
@@ -814,6 +927,7 @@ int UDPC_INTERNAL_threadfn(void *context)
         }
         mtx_lock(&ctx->tCVMtx);
         cnd_timedwait(&ctx->threadCV, &ctx->tCVMtx, &ts);
+        UDPC_update(ctx);
         mtx_unlock(&ctx->tCVMtx);
 
         mtx_lock(&ctx->tflagsMtx);
@@ -916,3 +1030,25 @@ char* UDPC_INTERNAL_atostr(UDPC_Context *ctx, uint32_t addr)
 
     return ctx->atostrBuf;
 }
+
+uint32_t UDPC_INTERNAL_generate_id(UDPC_Context *ctx)
+{
+    uint32_t newID = 0x10000000;
+
+    while(newID == 0x10000000)
+    {
+        newID = rand() % 0x10000000;
+        UDPC_HashMap_itercall(ctx->conMap, UDPC_INTERNAL_check_ids, &newID);
+    }
+
+    return newID;
+}
+
+void UDPC_INTERNAL_check_ids(void *userData, uint32_t addr, char *data)
+{
+    UDPC_INTERNAL_ConnectionData *cd = (UDPC_INTERNAL_ConnectionData*)data;
+    if(cd->id == *((uint32_t*)userData))
+    {
+        *((uint32_t*)userData) = 0x10000000;
+    }
+}
index a1ab794294e508eb2ff9c15fa3825e34ccc84507..d1290b29fabffb5bd1a3289ce9db6b4d19fc0eb7 100644 (file)
 
 #define UDPC_ATOSTR_BUF_SIZE 16
 
+/// (void *userData, uint32_t address)
+/*!
+ * Note address is in network byte order (usually big-endian)
+ */
+typedef void (*UDPC_callback_connected)(void*, uint32_t);
+
+/// (void *userData, uint32_t address)
+/*!
+ * Note address is in network byte order (usually big-endian)
+ */
+typedef void (*UDPC_callback_disconnected)(void*, uint32_t);
+
+/// (void *userData, char *packetData, uint32_t packetSize)
+/*!
+ * The data pointed to by the packetData argument is to data internally managed
+ * by the UDPC_Context. It will change every time this callback is called so do
+ * not depend on it persisting. This means you should copy the data out of it
+ * when the callback is invoked and work with the copied data.
+ */
+typedef void (*UDPC_callback_received)(void*, char*, uint32_t);
+
 /// This struct should not be used outside of this library
 typedef struct {
     uint32_t addr; // in network order (big-endian)
@@ -37,7 +58,7 @@ typedef struct {
      * 0x4 - has been re-sent
      */
     uint32_t flags;
-    char *data; // no-header in sendPktQueue, header in sentPkts
+    char *data; // no-header in sendPktQueue and receivedPackets, header in sentPkts
     uint32_t size;
     struct timespec sent;
 } UDPC_INTERNAL_PacketInfo;
@@ -94,6 +115,14 @@ typedef struct {
     struct timespec lastUpdated;
     char atostrBuf[UDPC_ATOSTR_BUF_SIZE];
     char recvBuf[UDPC_PACKET_MAX_SIZE];
+    UDPC_Deque *receivedPackets;
+
+    UDPC_callback_connected callbackConnected;
+    void *callbackConnectedUserData;
+    UDPC_callback_disconnected callbackDisconnected;
+    void *callbackDisconnectedUserData;
+    UDPC_callback_received callbackReceived;
+    void *callbackReceivedUserData;
 } UDPC_Context;
 
 typedef struct {
@@ -111,6 +140,27 @@ void UDPC_destroy(UDPC_Context *ctx);
 
 void UDPC_INTERNAL_destroy_conMap(void *unused, uint32_t addr, char *data);
 
+void UDPC_set_callback_connected(
+    UDPC_Context *ctx, UDPC_callback_connected fptr, void *userData);
+
+void UDPC_set_callback_disconnected(
+    UDPC_Context *ctx, UDPC_callback_disconnected fptr, void *userData);
+
+void UDPC_set_callback_received(
+    UDPC_Context *ctx, UDPC_callback_received fptr, void *userData);
+
+void UDPC_check_received(UDPC_Context *ctx);
+
+/*!
+ * \brief Queues a packet to send to a connected peer
+ * Note addr is expected to be in network-byte-order (big-endian).
+ * If isChecked is non-zero, UDPC will attempt to resend the packet if peer has
+ * not received it within UDPC_PACKET_TIMEOUT_SEC seconds.
+ * \return non-zero on success
+ */
+int UDPC_queue_send(
+    UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *data, uint32_t size);
+
 uint32_t UDPC_get_error(UDPC_Context *ctx);
 
 const char* UDPC_get_error_str(uint32_t error);
@@ -146,9 +196,9 @@ void UDPC_INTERNAL_check_pkt_timeout(
     uint32_t ack,
     struct timespec *tsNow);
 
-float UDPC_ts_diff_to_seconds(struct timespec *ts0, struct timespec *ts1);
+float UDPC_INTERNAL_ts_diff(struct timespec *ts0, struct timespec *ts1);
 
-int UDPC_INTERNAL_threadfn(void *context); // internal usage only
+int UDPC_INTERNAL_threadfn(void *context);
 
 /*
  * 0x1 - is ping
@@ -173,4 +223,8 @@ void UDPC_INTERNAL_log(UDPC_Context *ctx, uint32_t level, const char *msg, ...);
 
 char* UDPC_INTERNAL_atostr(UDPC_Context *ctx, uint32_t addr);
 
+uint32_t UDPC_INTERNAL_generate_id(UDPC_Context *ctx);
+
+void UDPC_INTERNAL_check_ids(void *userData, uint32_t addr, char *data);
+
 #endif