]> git.seodisparate.com - UDPConnection/commitdiff
Many fixes, including sending packets with payload
authorStephen Seo <seo.disparate@gmail.com>
Fri, 20 Sep 2019 05:01:26 +0000 (14:01 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Fri, 20 Sep 2019 05:02:41 +0000 (14:02 +0900)
Implemented UDPC_get_received
Added UDPC_get_list_connected and UDPC_free_list_connected.

cpp_impl/src/TSQueue.hpp
cpp_impl/src/UDPC_Defines.hpp
cpp_impl/src/UDPConnection.cpp
cpp_impl/src/UDPConnection.h
cpp_impl/src/test/TestTSQueue.cpp
cpp_impl/src/test/UDPC_NetworkTest.cpp

index 81dadf59464d00fe3743470cf6d842e539d21ae5..714788feb8cbfece65cd1467e0c7925a9a80ebe9 100644 (file)
@@ -28,7 +28,12 @@ class TSQueue {
     bool pop();
     std::optional<T> top_and_pop();
     void clear();
-    void changeCapacity(unsigned int newCapacity);
+    /*
+     * status ==
+     * 0 - success
+     * 1 - success, but previous size was reduced
+     */
+    void changeCapacity(unsigned int newCapacity, unsigned int *status);
     unsigned int size();
     unsigned int capacity();
     bool empty();
@@ -119,8 +124,15 @@ void TSQueue<T>::clear() {
 }
 
 template <typename T>
-void TSQueue<T>::changeCapacity(unsigned int newCapacity) {
+void TSQueue<T>::changeCapacity(unsigned int newCapacity, unsigned int *status) {
     std::lock_guard<std::mutex> lock(mutex);
+    if(status) {
+        if(rb.getSize() < newCapacity) {
+            *status = 1;
+        } else {
+            *status = 0;
+        }
+    }
     rb.changeCapacity(newCapacity);
 }
 
index 6eefe096edff8f83b504afd48ebb04ff7e0bca17..778cde79f832106d0a033c0794182320e56ffc82 100644 (file)
@@ -5,7 +5,7 @@
 #define UDPC_CONTEXT_IDENTIFIER 0x902F4DB3
 #define UDPC_SENT_PKTS_MAX_SIZE 33
 #define UDPC_QUEUED_PKTS_MAX_SIZE 32
-#define UDPC_RECEIVED_PKTS_MAX_SIZE 50
+#define UDPC_RECEIVED_PKTS_MAX_SIZE 64
 
 #define UDPC_ID_CONNECT 0x80000000
 #define UDPC_ID_PING 0x40000000
@@ -103,7 +103,6 @@ struct ConnectionData {
     std::deque<UDPC_PacketInfo> sentPkts;
     TSQueue<UDPC_PacketInfo> sendPkts;
     TSQueue<UDPC_PacketInfo> priorityPkts;
-    TSQueue<UDPC_PacketInfo> receivedPkts;
     // pkt id to pkt shared_ptr
     std::unordered_map<uint32_t, SentPktInfo::Ptr> sentInfoMap;
     std::chrono::steady_clock::time_point received;
@@ -253,6 +252,7 @@ public:
     std::unordered_map<struct in6_addr, std::unordered_set<UDPC_ConnectionId, ConnectionIdHasher>, IPV6_Hasher> addrConMap;
     // id to ipv6 address and port (as UDPC_ConnectionId)
     std::unordered_map<uint32_t, UDPC_ConnectionId> idMap;
+    TSQueue<UDPC_PacketInfo> receivedPkts;
 
     std::default_random_engine rng_engine;
 
index c5c8e63d469fdedb7968ab9276f1fb8ddc86ef07..878d792ac9213f2a6f940940921d54c4deb3677f 100644 (file)
@@ -13,6 +13,7 @@
 #include <ios>
 #include <iomanip>
 #include <regex>
+#include <cstdlib>
 
 #if UDPC_PLATFORM == UDPC_PLATFORM_WINDOWS
 #include <netioapi.h>
@@ -90,7 +91,6 @@ port(0),
 sentPkts(),
 sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
 priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
-receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
 received(std::chrono::steady_clock::now()),
 sent(std::chrono::steady_clock::now()),
 rtt(std::chrono::steady_clock::duration::zero())
@@ -120,7 +120,6 @@ port(port),
 sentPkts(),
 sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
 priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
-receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
 received(std::chrono::steady_clock::now()),
 sent(std::chrono::steady_clock::now()),
 rtt(std::chrono::steady_clock::duration::zero())
@@ -157,9 +156,14 @@ loggingType(UDPC_INFO),
 loggingType(UDPC_WARNING),
 #endif
 atostrBufIndex(0),
+receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
 rng_engine(),
 mutex()
 {
+    for(unsigned int i = 0; i < UDPC_ATOSTR_SIZE; ++i) {
+        atostrBuf[i] = 0;
+    }
+
     if(isThreaded) {
         flags.set(0);
     } else {
@@ -389,7 +393,10 @@ void UDPC::Context::update_impl() {
 
             UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
             destinationInfo.sin6_family = AF_INET6;
-            std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
+            std::memcpy(
+                UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
+                UDPC_IPV6_ADDR_SUB(iter->first.addr),
+                16);
             destinationInfo.sin6_port = htons(iter->second.port);
             destinationInfo.sin6_flowinfo = 0;
             destinationInfo.sin6_scope_id = iter->first.scope_id;
@@ -411,6 +418,7 @@ void UDPC::Context::update_impl() {
             }
 
             UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
+            pInfo.flags = 0x4;
             pInfo.sender.addr = in6addr_loopback;
             pInfo.receiver.addr = iter->first.addr;
             pInfo.sender.port = socketInfo.sin6_port;
@@ -450,8 +458,13 @@ void UDPC::Context::update_impl() {
 
             UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
             destinationInfo.sin6_family = AF_INET6;
-            std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
+            std::memcpy(
+                UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
+                UDPC_IPV6_ADDR_SUB(iter->first.addr),
+                16);
             destinationInfo.sin6_port = htons(iter->second.port);
+            destinationInfo.sin6_flowinfo = 0;
+            destinationInfo.sin6_scope_id = iter->first.scope_id;
             long int sentBytes = sendto(
                 socketHandle,
                 buf.get(),
@@ -480,7 +493,7 @@ void UDPC::Context::update_impl() {
                 sentPInfo.sender.port = socketInfo.sin6_port;
                 sentPInfo.receiver.port = iter->second.port;
 
-                iter->second.sentPkts.push_back(std::move(pInfo));
+                iter->second.sentPkts.push_back(std::move(sentPInfo));
                 iter->second.cleanupSentPkts();
             } else {
                 // is not check-received, only id stored in data array
@@ -491,9 +504,9 @@ void UDPC::Context::update_impl() {
                 sentPInfo.receiver.addr = iter->first.addr;
                 sentPInfo.sender.port = socketInfo.sin6_port;
                 sentPInfo.receiver.port = iter->second.port;
-                *((uint32_t*)(sentPInfo.data + 8)) = iter->second.lseq - 1;
+                *((uint32_t*)(sentPInfo.data + 8)) = htonl(iter->second.lseq - 1);
 
-                iter->second.sentPkts.push_back(std::move(pInfo));
+                iter->second.sentPkts.push_back(std::move(sentPInfo));
                 iter->second.cleanupSentPkts();
             }
 
@@ -502,6 +515,7 @@ void UDPC::Context::update_impl() {
             sentPktInfo->id = iter->second.lseq - 1;
             iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
         }
+        iter->second.sent = now;
     }
 
     // receive packet
@@ -785,14 +799,13 @@ void UDPC::Context::update_impl() {
         recPktInfo.sender.port = ntohs(receivedData.sin6_port);
         recPktInfo.receiver.port = ntohs(socketInfo.sin6_port);
 
-        if(iter->second.receivedPkts.size() == iter->second.receivedPkts.capacity()) {
+        if(!receivedPkts.push(recPktInfo)) {
             log(
                 UDPC_LoggingType::UDPC_WARNING,
                 "receivedPkts is full, removing oldest entry to make room");
-            iter->second.receivedPkts.pop();
+            receivedPkts.pop();
+            receivedPkts.push(recPktInfo);
         }
-
-        iter->second.receivedPkts.push(recPktInfo);
     } else if(bytes == 20) {
         log(
             UDPC_LoggingType::UDPC_VERBOSE,
@@ -903,7 +916,7 @@ void UDPC::threadedUpdate(Context *ctx) {
         ctx->update_impl();
         ctx->mutex.unlock();
         nextNow = std::chrono::steady_clock::now();
-        std::this_thread::sleep_for(std::chrono::milliseconds(33) - (nextNow - now));
+        std::this_thread::sleep_for(std::chrono::milliseconds(11) - (nextNow - now));
     }
 }
 
@@ -1095,7 +1108,7 @@ int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectio
 }
 
 void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
-                     uint32_t isChecked, void *data, uint32_t size) {
+                     int isChecked, void *data, uint32_t size) {
     if(size == 0 || !data) {
         return;
     }
@@ -1123,7 +1136,7 @@ void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
     sendInfo.sender.port = c->socketInfo.sin6_port;
     sendInfo.receiver.addr = destinationId.addr;
     sendInfo.receiver.port = iter->second.port;
-    sendInfo.flags = (isChecked ? 0x0 : 0x4);
+    sendInfo.flags = (isChecked != 0 ? 0x0 : 0x4);
 
     iter->second.sendPkts.push(sendInfo);
 }
@@ -1193,6 +1206,36 @@ int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
     return c->conMap.find(connectionId) == c->conMap.end() ? 0 : 1;
 }
 
+UDPC_ConnectionId* UDPC_get_list_connected(UDPC_HContext ctx, unsigned int *size) {
+    UDPC::Context *c = UDPC::verifyContext(ctx);
+    if(!c) {
+        return nullptr;
+    }
+
+    std::lock_guard<std::mutex> lock(c->mutex);
+
+    if(c->conMap.empty()) {
+        return nullptr;
+    }
+    if(size) {
+        *size = c->conMap.size();
+    }
+
+    UDPC_ConnectionId *list = (UDPC_ConnectionId*)std::malloc(
+            sizeof(UDPC_ConnectionId) * (c->conMap.size() + 1));
+    UDPC_ConnectionId *current = list;
+    for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) {
+        *current = iter->first;
+        ++current;
+    }
+    *current = UDPC_ConnectionId{{0}, 0, 0};
+    return list;
+}
+
+void UDPC_free_list_connected(UDPC_ConnectionId *list) {
+    std::free(list);
+}
+
 uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id) {
     UDPC::Context *c = UDPC::verifyContext(ctx);
     if(!c) {
@@ -1210,18 +1253,41 @@ UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggi
     return static_cast<UDPC_LoggingType>(c->loggingType.exchange(loggingType));
 }
 
-UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx) {
+UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned int *remaining) {
     UDPC::Context *c = UDPC::verifyContext(ctx);
     if(!c) {
         return UDPC::get_empty_pinfo();
     }
 
-    std::lock_guard<std::mutex> lock(c->mutex);
-
-    // TODO impl
+    auto opt_pinfo = c->receivedPkts.top_and_pop();
+    if(remaining) {
+        *remaining = c->receivedPkts.size();
+    }
+    if(opt_pinfo) {
+        return *opt_pinfo;
+    }
     return UDPC::get_empty_pinfo();
 }
 
+int UDPC_set_received_capacity(UDPC_HContext ctx, unsigned int newCapacity) {
+    if(newCapacity == 0) {
+        return 0;
+    }
+
+    UDPC::Context *c = UDPC::verifyContext(ctx);
+    if(!c) {
+        return 0;
+    }
+
+    unsigned int status = 0;
+    c->receivedPkts.changeCapacity(newCapacity, &status);
+    if(status == 1) {
+        c->log(UDPC_LoggingType::UDPC_WARNING,
+            "Received Queue: Previous size was truncated to new capacity");
+    }
+    return 1;
+}
+
 const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
     return UDPC_atostr(ctx, connectionId.addr);
 }
@@ -1255,7 +1321,8 @@ const char *UDPC_atostr(UDPC_HContext ctx, UDPC_IPV6_ADDR_TYPE addr) {
             }
         }
 
-        if(UDPC_IPV6_ADDR_SUB(addr)[i] == 0 && (headIndex - index <= 1 || c->atostrBuf[index] == ':')) {
+        if(UDPC_IPV6_ADDR_SUB(addr)[i] == 0
+                && (headIndex - index <= 1 || c->atostrBuf[index] == ':')) {
             continue;
         } else {
             std::stringstream sstream;
index 93a068b9935ad4b70eb915976ba6f6fd1659845d..3d24e0280ec883414cfcaa0bbea3f0b0ce24c066 100644 (file)
@@ -103,7 +103,7 @@ void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connec
 int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
 
 void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
-                     uint32_t isChecked, void *data, uint32_t size);
+                     int isChecked, void *data, uint32_t size);
 
 int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting);
 
@@ -111,11 +111,17 @@ int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool
 
 int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
 
+UDPC_ConnectionId* UDPC_get_list_connected(UDPC_HContext ctx, unsigned int *size);
+
+void UDPC_free_list_connected(UDPC_ConnectionId *list);
+
 uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id);
 
 UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType);
 
-UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx);
+UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned int *remaining);
+
+int UDPC_set_received_capacity(UDPC_HContext ctx, unsigned int newCapacity);
 
 const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
 
index fc6dc205cd6cad3c2b571c5ac7aa9d329bbdfd76..9721be401c4b9a4e9c94e4244427f5a4e72c0be3 100644 (file)
@@ -62,7 +62,7 @@ TEST(TSQueue, Usage)
     EXPECT_FALSE(q.push(temp));
     EXPECT_EQ(q.size(), 4);
 
-    q.changeCapacity(8);
+    q.changeCapacity(8, nullptr);
     EXPECT_EQ(q.size(), 4);
 
     temp = 10;
@@ -87,7 +87,7 @@ TEST(TSQueue, Usage)
 
     EXPECT_EQ(400, q.top());
 
-    q.changeCapacity(1);
+    q.changeCapacity(1, nullptr);
 
     // { 10 }
 
index ab9df14dd76baa01834278b21f1c9de4fb8c9aea..4501dcdfdcfc51a298c998136659ebc188fa4eef 100644 (file)
@@ -4,6 +4,7 @@
 #include <thread>
 #include <chrono>
 #include <regex>
+#include <vector>
 
 #include <UDPConnection.h>
 
@@ -15,6 +16,8 @@ void usage() {
     puts("-lp <port> - listen port");
     puts("-cl <addr> - connection addr (client only)");
     puts("-cp <port> - connection port (client only)");
+    puts("-t <tick_count>");
+    puts("-n - do not add payload to packets");
 }
 
 int main(int argc, char **argv) {
@@ -30,6 +33,7 @@ int main(int argc, char **argv) {
     const char *connectionAddr = nullptr;
     const char *connectionPort = nullptr;
     unsigned int tickLimit = 15;
+    bool noPayload = false;
     while(argc > 0) {
         if(std::strcmp(argv[0], "-c") == 0) {
             isClient = true;
@@ -51,6 +55,9 @@ int main(int argc, char **argv) {
             --argc; ++argv;
             tickLimit = std::atoi(argv[0]);
             printf("Set tick limit to %u\n", tickLimit);
+        } else if(std::strcmp(argv[0], "-n") == 0) {
+            noPayload = true;
+            puts("Disabling sending payload");
         } else {
             printf("ERROR: invalid argument \"%s\"\n", argv[0]);
             usage();
@@ -101,11 +108,43 @@ int main(int argc, char **argv) {
     }
     UDPC_set_logging_type(context, UDPC_LoggingType::UDPC_INFO);
     unsigned int tick = 0;
+    unsigned int temp = 0;
+    unsigned int temp2, temp3;
+    UDPC_ConnectionId *list = nullptr;
+    std::vector<unsigned int> sendIds;
+    UDPC_PacketInfo received;
     while(true) {
         std::this_thread::sleep_for(std::chrono::seconds(1));
         if(isClient && UDPC_has_connection(context, connectionId) == 0) {
             UDPC_client_initiate_connection(context, connectionId);
         }
+        if(!noPayload) {
+            list = UDPC_get_list_connected(context, &temp);
+            if(list) {
+                if(sendIds.size() < temp) {
+                    sendIds.resize(temp, 0);
+                } else if(sendIds.size() > temp) {
+                    sendIds.resize(temp);
+                }
+                for(unsigned int i = 0; i < temp; ++i) {
+                    temp2 = UDPC_get_queue_send_available(context, list[i]);
+                    for(unsigned int j = 0; j < temp2; ++j) {
+                        temp3 = htonl(sendIds[i]++);
+                        UDPC_queue_send(context, list[i], 0, &temp3, sizeof(unsigned int));
+                    }
+                }
+                UDPC_free_list_connected(list);
+            }
+            do {
+                received = UDPC_get_received(context, &temp);
+                if(received.dataSize == sizeof(unsigned int)) {
+                    if((received.flags & 0x8) != 0) {
+                        temp2 = ntohl(*((unsigned int*)received.data));
+                        printf("Got out of order, data = %u\n", temp2);
+                    }
+                }
+            } while (temp > 0);
+        }
         if(tick++ > tickLimit) {
             break;
         }