]> git.seodisparate.com - UDPConnection/commitdiff
Rework sendPkts (sending queue)
authorStephen Seo <seo.disparate@gmail.com>
Fri, 27 Sep 2019 11:19:48 +0000 (20:19 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Fri, 27 Sep 2019 11:19:48 +0000 (20:19 +0900)
Changed sendPkts in ConnectionData to std::deque, added a TSQueue
cSendPkts to Context. Queued packets will be moved to corresponding
ConnectionData instances during update, dropping with a warning those
where a connection does not exist.

Minor other fixes.

Some additions to TSQueue.

cpp_impl/src/TSQueue.hpp
cpp_impl/src/UDPC_Defines.hpp
cpp_impl/src/UDPConnection.cpp
cpp_impl/src/UDPConnection.h
cpp_impl/src/test/UDPC_NetworkTest.cpp

index 2352cf868c785af4066a8d0899568ef3f0bb0b4c..049916e14cdc38a262edacb0baf8eab6d696ff7c 100644 (file)
@@ -37,7 +37,9 @@ class TSQueue {
     void changeCapacity(unsigned int newCapacity, unsigned int *status);
     unsigned int size();
     unsigned int capacity();
+    unsigned int remaining_capacity();
     bool empty();
+    bool full();
 
   private:
     std::mutex mutex;
@@ -165,6 +167,13 @@ unsigned int TSQueue<T>::capacity() {
     return capacity;
 }
 
+template <typename T>
+unsigned int TSQueue<T>::remaining_capacity() {
+    std::lock_guard<std::mutex> lock(mutex);
+    unsigned int remaining = rb.getCapacity() - rb.getSize();
+    return remaining;
+}
+
 template <typename T>
 bool TSQueue<T>::empty() {
     // No lock required, since this is calling size() that uses a lock
@@ -172,4 +181,11 @@ bool TSQueue<T>::empty() {
     return size == 0;
 }
 
+template <typename T>
+bool TSQueue<T>::full() {
+    // No lock required, calling remaining_capacity() that uses a lock
+    unsigned int remaining = remaining_capacity();
+    return remaining == 0;
+}
+
 #endif
index 50a11708fe9b8e6c902b24d8f29aca5fb578cad4..25bf295a57d01a84a1b9b12bdc0d597f4e41104f 100644 (file)
@@ -4,7 +4,7 @@
 
 #define UDPC_CONTEXT_IDENTIFIER 0x902F4DB3
 #define UDPC_SENT_PKTS_MAX_SIZE 33
-#define UDPC_QUEUED_PKTS_MAX_SIZE 32
+#define UDPC_QUEUED_PKTS_MAX_SIZE 64
 #define UDPC_RECEIVED_PKTS_MAX_SIZE 64
 
 #define UDPC_ID_CONNECT 0x80000000
@@ -110,8 +110,8 @@ struct ConnectionData {
     uint32_t scope_id;
     uint16_t port; // in native order
     std::deque<UDPC_PacketInfo> sentPkts;
-    TSQueue<UDPC_PacketInfo> sendPkts;
-    TSQueue<UDPC_PacketInfo> priorityPkts;
+    std::deque<UDPC_PacketInfo> sendPkts;
+    std::deque<UDPC_PacketInfo> priorityPkts;
     // pkt id to pkt shared_ptr
     std::unordered_map<uint32_t, SentPktInfo::Ptr> sentInfoMap;
     std::chrono::steady_clock::time_point received;
@@ -266,6 +266,7 @@ public:
     // id to ipv6 address and port (as UDPC_ConnectionId)
     std::unordered_map<uint32_t, UDPC_ConnectionId> idMap;
     TSQueue<UDPC_PacketInfo> receivedPkts;
+    TSQueue<UDPC_PacketInfo> cSendPkts;
 
     std::default_random_engine rng_engine;
 
index 1b6ee262350d43e941567021a3a2c617cad87fbb..aba8b354206c2be59a104b3b67cd2007a5c3f339 100644 (file)
@@ -89,8 +89,8 @@ toggledTimer(std::chrono::steady_clock::duration::zero()),
 addr({0}),
 port(0),
 sentPkts(),
-sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
-priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
+sendPkts(),
+priorityPkts(),
 received(std::chrono::steady_clock::now()),
 sent(std::chrono::steady_clock::now()),
 rtt(std::chrono::steady_clock::duration::zero())
@@ -125,8 +125,8 @@ addr(addr),
 scope_id(scope_id),
 port(port),
 sentPkts(),
-sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
-priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
+sendPkts(),
+priorityPkts(),
 received(std::chrono::steady_clock::now()),
 sent(std::chrono::steady_clock::now()),
 rtt(std::chrono::steady_clock::duration::zero())
@@ -171,6 +171,7 @@ loggingType(UDPC_WARNING),
 #endif
 atostrBufIndex(0),
 receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
+cSendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
 rng_engine(),
 mutex()
 {
@@ -312,6 +313,30 @@ void UDPC::Context::update_impl() {
         }
     }
 
+    // move queued in cSendPkts to existing connection's sendPkts
+    {
+        unsigned int rsize = 0;
+        do {
+            auto next = cSendPkts.top_and_pop_and_rsize(&rsize);
+            if(next) {
+                if(auto iter = conMap.find(next.value().receiver);
+                        iter != conMap.end()) {
+                    iter->second.sendPkts.push_back(next.value());
+                } else {
+                    UDPC_CHECK_LOG(this,
+                        UDPC_LoggingType::UDPC_WARNING,
+                        "Dropped queued packet to ",
+                        UDPC_atostr(
+                            (UDPC_HContext)this,
+                            next.value().receiver.addr),
+                        ", port = ",
+                        next.value().receiver.port,
+                        " due to connection not existing");
+                }
+            }
+        } while(rsize != 0);
+    }
+
     // update send (only if triggerSend flag is set)
     for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
         if(!iter->second.flags.test(0)) {
@@ -479,13 +504,12 @@ void UDPC::Context::update_impl() {
             UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
             bool isResending = false;
             if(!iter->second.priorityPkts.empty()) {
-                // TODO verify getting struct copy is valid
-                pInfo = std::move(iter->second.priorityPkts.top().value());
-                iter->second.priorityPkts.pop();
+                pInfo = iter->second.priorityPkts.front();
+                iter->second.priorityPkts.pop_front();
                 isResending = true;
             } else {
-                pInfo = std::move(iter->second.sendPkts.top().value());
-                iter->second.sendPkts.pop();
+                pInfo = iter->second.sendPkts.front();
+                iter->second.sendPkts.pop_front();
             }
             std::unique_ptr<char[]> buf = std::make_unique<char[]>(UDPC_FULL_HEADER_SIZE + pInfo.dataSize);
             UDPC::preparePacket(
@@ -810,7 +834,7 @@ void UDPC::Context::update_impl() {
                     resendingData.dataSize = sentIter->dataSize - UDPC_FULL_HEADER_SIZE;
                     std::memcpy(resendingData.data, sentIter->data + UDPC_FULL_HEADER_SIZE, resendingData.dataSize);
                     resendingData.flags = 0;
-                    iter->second.priorityPkts.push(resendingData);
+                    iter->second.priorityPkts.push_back(resendingData);
                 }
                 break;
             }
@@ -1210,20 +1234,13 @@ void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connec
     }
 }
 
-int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
+int UDPC_get_queue_send_available(UDPC_HContext ctx) {
     UDPC::Context *c = UDPC::verifyContext(ctx);
     if(!c) {
         return 0;
     }
 
-    std::lock_guard<std::mutex> lock(c->mutex);
-
-    auto iter = c->conMap.find(connectionId);
-    if(iter != c->conMap.end()) {
-        return iter->second.sendPkts.capacity() - iter->second.sendPkts.size();
-    } else {
-        return 0;
-    }
+    return c->cSendPkts.remaining_capacity();
 }
 
 void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
@@ -1235,16 +1252,14 @@ void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
     UDPC::Context *c = UDPC::verifyContext(ctx);
     if(!c) {
         return;
-    }
-
-    std::lock_guard<std::mutex> lock(c->mutex);
-
-    auto iter = c->conMap.find(destinationId);
-    if(iter == c->conMap.end()) {
+    } else if(c->cSendPkts.full()) {
         UDPC_CHECK_LOG(c,
             UDPC_LoggingType::UDPC_ERROR,
-            "Failed to add packet to queue, no established connection "
-            "with recipient");
+            "Failed to queue packet to ",
+            UDPC_atostr(ctx, destinationId.addr),
+            ", port = ",
+            destinationId.port,
+            " because queue is full");
         return;
     }
 
@@ -1254,10 +1269,10 @@ void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
     sendInfo.sender.addr = in6addr_loopback;
     sendInfo.sender.port = ntohs(c->socketInfo.sin6_port);
     sendInfo.receiver.addr = destinationId.addr;
-    sendInfo.receiver.port = iter->second.port;
+    sendInfo.receiver.port = destinationId.port;
     sendInfo.flags = (isChecked != 0 ? 0x0 : 0x4);
 
-    iter->second.sendPkts.push(sendInfo);
+    c->cSendPkts.push(sendInfo);
 }
 
 int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting) {
@@ -1265,7 +1280,6 @@ int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting) {
     if(!c) {
         return 0;
     }
-    std::lock_guard<std::mutex> lock(c->mutex);
     return c->isAcceptNewConnections.exchange(isAccepting == 0 ? false : true);
 }
 
index d0dbedbcc5645145e1b2e2d7791e6dff2c845cef..1783f3168742fcc2b7633e664e7391f8dcf9ac35 100644 (file)
@@ -105,7 +105,7 @@ void UDPC_update(UDPC_HContext ctx);
 
 void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
 
-int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
+int UDPC_get_queue_send_available(UDPC_HContext ctx);
 
 void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
                      int isChecked, void *data, uint32_t size);
index 4501dcdfdcfc51a298c998136659ebc188fa4eef..9fa55565372a31f3f98d0de94ea661b65bb86823 100644 (file)
@@ -126,12 +126,10 @@ int main(int argc, char **argv) {
                 } else if(sendIds.size() > temp) {
                     sendIds.resize(temp);
                 }
-                for(unsigned int i = 0; i < temp; ++i) {
-                    temp2 = UDPC_get_queue_send_available(context, list[i]);
-                    for(unsigned int j = 0; j < temp2; ++j) {
-                        temp3 = htonl(sendIds[i]++);
-                        UDPC_queue_send(context, list[i], 0, &temp3, sizeof(unsigned int));
-                    }
+                temp2 = UDPC_get_queue_send_available(context);
+                for(unsigned int i = 0; i < temp2; ++i) {
+                    temp3 = htonl(sendIds[i % temp]++);
+                    UDPC_queue_send(context, list[i % temp], 0, &temp3, sizeof(unsigned int));
                 }
                 UDPC_free_list_connected(list);
             }