]> git.seodisparate.com - UDPConnection/commitdiff
Replace TSLQueue with std::deque where possible
authorStephen Seo <seo.disparate@gmail.com>
Wed, 8 Jan 2020 10:55:12 +0000 (19:55 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Wed, 8 Jan 2020 10:55:12 +0000 (19:55 +0900)
Also added std::mutex for each new std::deque. cSendPkts is left as a
TSLQueue because it needs to support fast removal from the middle of the
data structure (mainly because the queued packets per ConnectionData has
an imposed limit of packets to hold).

src/UDPC_Defines.hpp
src/UDPConnection.cpp

index aa4135687ce39a06a1a1f97882fed441f2a00707..2114cbc91191137e173b4303c29892c16857efc5 100644 (file)
@@ -235,12 +235,15 @@ public:
     std::unordered_map<uint32_t, UDPC_ConnectionId> idMap;
     std::unordered_set<UDPC_ConnectionId, ConnectionIdHasher> deletionMap;
     std::unordered_set<PKContainer, PKContainer> peerPKWhitelist;
-    TSLQueue<UDPC_PacketInfo> receivedPkts;
+    std::deque<UDPC_PacketInfo> receivedPkts;
+    std::mutex receivedPktsMutex;
     TSLQueue<UDPC_PacketInfo> cSendPkts;
     // handled internally
-    TSLQueue<UDPC_Event> internalEvents;
+    std::deque<UDPC_Event> internalEvents;
+    std::mutex internalEventsMutex;
     // handled via interface, if isReceivingEvents is true
-    TSLQueue<UDPC_Event> externalEvents;
+    std::deque<UDPC_Event> externalEvents;
+    std::mutex externalEventsMutex;
 
     std::default_random_engine rng_engine;
 
index 3f100b3ee071b3f89281ade82822166b2d18bdac..1c5a6a4b7865305657965e446bb99f4e31d5f265 100644 (file)
@@ -279,10 +279,12 @@ void UDPC::Context::update_impl() {
     lastUpdated = now;
 
     // handle internalEvents
-    do {
-        auto optE = internalEvents.top_and_pop();
-        if(optE) {
-            switch(optE->type) {
+    {
+        std::lock_guard<std::mutex> intEvLock(internalEventsMutex);
+        while(!internalEvents.empty()) {
+            auto event = internalEvents.front();
+            internalEvents.pop_front();
+            switch(event.type) {
             case UDPC_ET_REQUEST_CONNECT:
             {
                 unsigned char *sk = nullptr;
@@ -294,11 +296,11 @@ void UDPC::Context::update_impl() {
                 UDPC::ConnectionData newCon(
                     false,
                     this,
-                    optE->conId.addr,
-                    optE->conId.scope_id,
-                    optE->conId.port,
+                    event.conId.addr,
+                    event.conId.scope_id,
+                    event.conId.port,
 #ifdef UDPC_LIBSODIUM_ENABLED
-                    flags.test(2) && optE->v.enableLibSodium != 0,
+                    flags.test(2) && event.v.enableLibSodium != 0,
                     sk, pk);
 #else
                     false,
@@ -309,52 +311,52 @@ void UDPC::Context::update_impl() {
                         UDPC_LoggingType::UDPC_ERROR,
                         "Failed to init ConnectionData instance (libsodium "
                         "init fail) while client establishing connection with ",
-                        UDPC_atostr((UDPC_HContext)this, optE->conId.addr),
+                        UDPC_atostr((UDPC_HContext)this, event.conId.addr),
                         " port ",
-                        optE->conId.port);
+                        event.conId.port);
                     continue;
                 }
                 newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT;
 
 
-                std::lock_guard<std::mutex> lock(conMapMutex);
-                if(conMap.find(optE->conId) == conMap.end()) {
+                std::lock_guard<std::mutex> conMapLock(conMapMutex);
+                if(conMap.find(event.conId) == conMap.end()) {
                     conMap.insert(std::make_pair(
-                        optE->conId,
+                        event.conId,
                         std::move(newCon)));
-                    auto addrConIter = addrConMap.find(optE->conId.addr);
+                    auto addrConIter = addrConMap.find(event.conId.addr);
                     if(addrConIter == addrConMap.end()) {
                         auto insertResult = addrConMap.insert(std::make_pair(
-                            optE->conId.addr,
+                            event.conId.addr,
                             std::unordered_set<UDPC_ConnectionId, UDPC::ConnectionIdHasher>{}));
                         assert(insertResult.second &&
                             "new connection insert into addrConMap must not fail");
                         addrConIter = insertResult.first;
                     }
-                    addrConIter->second.insert(optE->conId);
+                    addrConIter->second.insert(event.conId);
                     UDPC_CHECK_LOG(this,
                         UDPC_LoggingType::UDPC_INFO,
                         "Client initiating connection to ",
-                        UDPC_atostr((UDPC_HContext)this, optE->conId.addr),
+                        UDPC_atostr((UDPC_HContext)this, event.conId.addr),
                         " port ",
-                        optE->conId.port,
+                        event.conId.port,
                         " ...");
                 } else {
                     UDPC_CHECK_LOG(this,
                         UDPC_LoggingType::UDPC_WARNING,
                         "Client initiate connection, already connected to peer ",
-                        UDPC_atostr((UDPC_HContext)this, optE->conId.addr),
+                        UDPC_atostr((UDPC_HContext)this, event.conId.addr),
                         " port ",
-                        optE->conId.port);
+                        event.conId.port);
                 }
             }
                 break;
             case UDPC_ET_REQUEST_DISCONNECT:
             {
-                std::lock_guard<std::mutex> lock(conMapMutex);
-                if(optE->v.dropAllWithAddr != 0) {
+                std::lock_guard<std::mutex> conMapLock(conMapMutex);
+                if(event.v.dropAllWithAddr != 0) {
                     // drop all connections with same address
-                    auto addrConIter = addrConMap.find(optE->conId.addr);
+                    auto addrConIter = addrConMap.find(event.conId.addr);
                     if(addrConIter != addrConMap.end()) {
                         for(auto identIter = addrConIter->second.begin();
                                 identIter != addrConIter->second.end();
@@ -367,7 +369,7 @@ void UDPC::Context::update_impl() {
                     }
                 } else {
                     // drop only specific connection with addr and port
-                    auto iter = conMap.find(optE->conId);
+                    auto iter = conMap.find(event.conId);
                     if(iter != conMap.end()) {
                         deletionMap.insert(iter->first);
                     }
@@ -379,12 +381,12 @@ void UDPC::Context::update_impl() {
                 break;
             }
         }
-    } while(!internalEvents.empty());
+    }
 
     {
         // check timed out, check good/bad mode with rtt, remove timed out
         std::vector<UDPC_ConnectionId> removed;
-        std::lock_guard<std::mutex> lock(conMapMutex);
+        std::lock_guard<std::mutex> conMapLock(conMapMutex);
         for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
             temp_dt_fs = now - iter->second.received;
             if(temp_dt_fs >= UDPC::CONNECTION_TIMEOUT) {
@@ -415,7 +417,8 @@ void UDPC::Context::update_impl() {
                 }
                 iter->second.toggledTimer = std::chrono::steady_clock::duration::zero();
                 if(isReceivingEvents.load()) {
-                    externalEvents.push(UDPC_Event{
+                    std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+                    externalEvents.push_back(UDPC_Event{
                         UDPC_ET_BAD_MODE, iter->first, false});
                 }
             } else if(iter->second.flags.test(1)) {
@@ -441,7 +444,8 @@ void UDPC::Context::update_impl() {
                         iter->second.port);
                     iter->second.flags.set(1);
                     if(isReceivingEvents.load()) {
-                        externalEvents.push(UDPC_Event{
+                        std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+                        externalEvents.push_back(UDPC_Event{
                             UDPC_ET_GOOD_MODE, iter->first, false});
                     }
                 }
@@ -484,10 +488,12 @@ void UDPC::Context::update_impl() {
             }
             if(isReceivingEvents.load()) {
                 if(flags.test(1) && cIter->second.flags.test(3)) {
-                    externalEvents.push(UDPC_Event{
+                    std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+                    externalEvents.push_back(UDPC_Event{
                         UDPC_ET_FAIL_CONNECT, *iter, false});
                 } else {
-                    externalEvents.push(UDPC_Event{
+                    std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+                    externalEvents.push_back(UDPC_Event{
                         UDPC_ET_DISCONNECTED, *iter, false});
                 }
             }
@@ -504,7 +510,7 @@ void UDPC::Context::update_impl() {
         while(true) {
             auto next = sendIter.current();
             if(next) {
-                std::lock_guard<std::mutex> lock(conMapMutex);
+                std::lock_guard<std::mutex> conMapLock(conMapMutex);
                 auto iter = conMap.find(next->receiver);
                 if(iter != conMap.end()) {
                     if(iter->second.sendPkts.size() >= UDPC_QUEUED_PKTS_MAX_SIZE) {
@@ -558,7 +564,7 @@ void UDPC::Context::update_impl() {
 
     // update send (only if triggerSend flag is set)
     {
-        std::lock_guard<std::mutex> lock(conMapMutex);
+        std::lock_guard<std::mutex> conMapLock(conMapMutex);
         for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
             auto delIter = deletionMap.find(iter->first);
             if(!iter->second.flags.test(0) && delIter == deletionMap.end()) {
@@ -1030,7 +1036,7 @@ void UDPC::Context::update_impl() {
 
     // remove queued for deletion
     for(auto delIter = deletionMap.begin(); delIter != deletionMap.end(); ++delIter) {
-        std::lock_guard<std::mutex> lock(conMapMutex);
+        std::lock_guard<std::mutex> conMapLock(conMapMutex);
         auto iter = conMap.find(*delIter);
         if(iter != conMap.end()) {
             if(iter->second.flags.test(4)) {
@@ -1045,10 +1051,12 @@ void UDPC::Context::update_impl() {
             }
             if(isReceivingEvents.load()) {
                 if(flags.test(1) && iter->second.flags.test(3)) {
-                    externalEvents.push(UDPC_Event{
+                    std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+                    externalEvents.push_back(UDPC_Event{
                         UDPC_ET_FAIL_CONNECT, iter->first, false});
                 } else {
-                    externalEvents.push(UDPC_Event{
+                    std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+                    externalEvents.push_back(UDPC_Event{
                         UDPC_ET_DISCONNECTED, iter->first, false});
                 }
             }
@@ -1181,7 +1189,7 @@ void UDPC::Context::update_impl() {
 
     if(isConnect && !isPing) {
         // is connect packet and is accepting new connections
-        std::lock_guard<std::mutex> lock(conMapMutex);
+        std::lock_guard<std::mutex> conMapLock(conMapMutex);
         if(!flags.test(1)
                 && conMap.find(identifier) == conMap.end()
                 && isAcceptNewConnections.load()) {
@@ -1245,7 +1253,7 @@ void UDPC::Context::update_impl() {
                     recvBuf + UDPC_MIN_HEADER_SIZE + 4,
                     crypto_sign_PUBLICKEYBYTES);
                 {
-                    std::lock_guard<std::mutex> lock(peerPKWhitelistMutex);
+                    std::lock_guard<std::mutex> pkWhitelistLock(peerPKWhitelistMutex);
                     if(!peerPKWhitelist.empty() && peerPKWhitelist.find(UDPC::PKContainer(newConnection.peer_pk)) == peerPKWhitelist.end()) {
                         UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_WARNING,
                             "peer_pk is not in whitelist, not establishing "
@@ -1309,7 +1317,8 @@ void UDPC::Context::update_impl() {
             }
             addrConIter->second.insert(identifier);
             if(isReceivingEvents.load()) {
-                externalEvents.push(UDPC_Event{
+                std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+                externalEvents.push_back(UDPC_Event{
                     UDPC_ET_CONNECTED,
                     identifier,
                     false});
@@ -1357,7 +1366,7 @@ void UDPC::Context::update_impl() {
                     recvBuf + UDPC_MIN_HEADER_SIZE + 4,
                     crypto_sign_PUBLICKEYBYTES);
                 {
-                    std::lock_guard<std::mutex> lock(peerPKWhitelistMutex);
+                    std::lock_guard<std::mutex> pkWhitelistLock(peerPKWhitelistMutex);
                     if(!peerPKWhitelist.empty() && peerPKWhitelist.find(UDPC::PKContainer(iter->second.peer_pk)) == peerPKWhitelist.end()) {
                         UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_WARNING,
                             "peer_pk is not in whitelist, not establishing "
@@ -1404,7 +1413,8 @@ void UDPC::Context::update_impl() {
                 flags.test(2) && iter->second.flags.test(6) ?
                     ", libsodium enabled" : ", libsodium disabled");
             if(isReceivingEvents.load()) {
-                externalEvents.push(UDPC_Event{
+                std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+                externalEvents.push_back(UDPC_Event{
                     UDPC_ET_CONNECTED,
                     identifier,
                     false});
@@ -1413,7 +1423,7 @@ void UDPC::Context::update_impl() {
         return;
     }
 
-    std::lock_guard<std::mutex> lock(conMapMutex);
+    std::lock_guard<std::mutex> conMapLock(conMapMutex);
     auto iter = conMap.find(identifier);
     if(iter == conMap.end() || iter->second.flags.test(3)
             || !iter->second.flags.test(4) || iter->second.id != conID) {
@@ -1481,7 +1491,8 @@ void UDPC::Context::update_impl() {
                 }
             }
             if(isReceivingEvents.load()) {
-                externalEvents.push(UDPC_Event{
+                std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+                externalEvents.push_back(UDPC_Event{
                     UDPC_ET_DISCONNECTED, identifier, false});
             }
             conMap.erase(conIter);
@@ -1642,7 +1653,8 @@ void UDPC::Context::update_impl() {
         recPktInfo.receiver.port = ntohs(socketInfo.sin6_port);
         recPktInfo.rtt = durationToMS(iter->second.rtt);
 
-        receivedPkts.push(recPktInfo);
+        std::lock_guard<std::mutex> receivedPktsLock(receivedPktsMutex);
+        receivedPkts.push_back(recPktInfo);
     } else if(pktType == 1 && bytes > (int)UDPC_LSFULL_HEADER_SIZE) {
         UDPC_PacketInfo recPktInfo = UDPC::get_empty_pinfo();
         recPktInfo.dataSize = bytes - UDPC_LSFULL_HEADER_SIZE;
@@ -1660,7 +1672,8 @@ void UDPC::Context::update_impl() {
         recPktInfo.receiver.port = ntohs(socketInfo.sin6_port);
         recPktInfo.rtt = durationToMS(iter->second.rtt);
 
-        receivedPkts.push(recPktInfo);
+        std::lock_guard<std::mutex> receivedPktsLock(receivedPktsMutex);
+        receivedPkts.push_back(recPktInfo);
     } else {
         UDPC_CHECK_LOG(this,
             UDPC_LoggingType::UDPC_VERBOSE,
@@ -2099,7 +2112,8 @@ void UDPC_client_initiate_connection(
     }
 #endif
 
-    c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_CONNECT, connectionId, enableLibSodium});
+    std::lock_guard<std::mutex> intEvLock(c->internalEventsMutex);
+    c->internalEvents.push_back(UDPC_Event{UDPC_ET_REQUEST_CONNECT, connectionId, enableLibSodium});
 }
 
 void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
@@ -2140,7 +2154,7 @@ unsigned long UDPC_get_queued_size(UDPC_HContext ctx, UDPC_ConnectionId id, int
         return 0;
     }
 
-    std::lock_guard<std::mutex> lock(c->conMapMutex);
+    std::lock_guard<std::mutex> conMapLock(c->conMapMutex);
     auto iter = c->conMap.find(id);
     if(iter != c->conMap.end()) {
         if(exists) {
@@ -2173,7 +2187,8 @@ void UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, int
         return;
     }
 
-    c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_DISCONNECT, connectionId, dropAllWithAddr});
+    std::lock_guard<std::mutex> intEvLock(c->internalEventsMutex);
+    c->internalEvents.push_back(UDPC_Event{UDPC_ET_REQUEST_DISCONNECT, connectionId, dropAllWithAddr});
     return;
 }
 
@@ -2183,7 +2198,7 @@ int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
         return 0;
     }
 
-    std::lock_guard<std::mutex> lock(c->conMapMutex);
+    std::lock_guard<std::mutex> conMapLock(c->conMapMutex);
 
     return c->conMap.find(connectionId) == c->conMap.end() ? 0 : 1;
 }
@@ -2194,7 +2209,7 @@ UDPC_ConnectionId* UDPC_get_list_connected(UDPC_HContext ctx, unsigned int *size
         return nullptr;
     }
 
-    std::lock_guard<std::mutex> lock(c->conMapMutex);
+    std::lock_guard<std::mutex> conMapLock(c->conMapMutex);
 
     if(c->conMap.empty()) {
         if(size) {
@@ -2280,11 +2295,15 @@ UDPC_Event UDPC_get_event(UDPC_HContext ctx, unsigned long *remaining) {
         return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0};
     }
 
-    auto optE = c->externalEvents.top_and_pop_and_rsize(remaining);
-    if(optE) {
-        return *optE;
-    } else {
+    std::lock_guard<std::mutex> extEvLock(c->externalEventsMutex);
+    if(c->externalEvents.empty()) {
+        if(remaining) { *remaining = 0; }
         return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0};
+    } else {
+        auto event = c->externalEvents.front();
+        c->externalEvents.pop_front();
+        if(remaining) { *remaining = c->externalEvents.size(); }
+        return event;
     }
 }
 
@@ -2294,11 +2313,16 @@ UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned long *remaining) {
         return UDPC::get_empty_pinfo();
     }
 
-    auto opt_pinfo = c->receivedPkts.top_and_pop_and_rsize(remaining);
-    if(opt_pinfo) {
-        return *opt_pinfo;
+    std::lock_guard<std::mutex> receivedPktsLock(c->receivedPktsMutex);
+    if(c->receivedPkts.empty()) {
+        if(remaining) { *remaining = 0; }
+        return UDPC::get_empty_pinfo();
+    } else {
+        auto pinfo = c->receivedPkts.front();
+        c->receivedPkts.pop_front();
+        if(remaining) { *remaining = c->receivedPkts.size(); }
+        return pinfo;
     }
-    return UDPC::get_empty_pinfo();
 }
 
 int UDPC_set_libsodium_keys(UDPC_HContext ctx, unsigned char *sk, unsigned char *pk) {
@@ -2340,7 +2364,7 @@ int UDPC_add_whitelist_pk(UDPC_HContext ctx, unsigned char *pk) {
         return 0;
     }
 
-    std::lock_guard<std::mutex> lock(c->peerPKWhitelistMutex);
+    std::lock_guard<std::mutex> pkWhitelistLock(c->peerPKWhitelistMutex);
     auto result = c->peerPKWhitelist.insert(UDPC::PKContainer(pk));
     if(result.second) {
         return c->peerPKWhitelist.size();
@@ -2354,7 +2378,7 @@ int UDPC_has_whitelist_pk(UDPC_HContext ctx, unsigned char *pk) {
         return 0;
     }
 
-    std::lock_guard<std::mutex> lock(c->peerPKWhitelistMutex);
+    std::lock_guard<std::mutex> pkWhitelistLock(c->peerPKWhitelistMutex);
     if(c->peerPKWhitelist.find(UDPC::PKContainer(pk)) != c->peerPKWhitelist.end()) {
         return 1;
     }
@@ -2367,7 +2391,7 @@ int UDPC_remove_whitelist_pk(UDPC_HContext ctx, unsigned char *pk) {
         return 0;
     }
 
-    std::lock_guard<std::mutex> lock(c->peerPKWhitelistMutex);
+    std::lock_guard<std::mutex> pkWhitelistLock(c->peerPKWhitelistMutex);
     if(c->peerPKWhitelist.erase(UDPC::PKContainer(pk)) != 0) {
         return 1;
     }
@@ -2380,7 +2404,7 @@ int UDPC_clear_whitelist(UDPC_HContext ctx) {
         return 0;
     }
 
-    std::lock_guard<std::mutex> lock(c->peerPKWhitelistMutex);
+    std::lock_guard<std::mutex> pkWhitelistLock(c->peerPKWhitelistMutex);
     c->peerPKWhitelist.clear();
     return 1;
 }