From 4c48dbb0cf65366ccfca1d17652cc43c8cce5dfa Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Wed, 8 Jan 2020 19:55:12 +0900 Subject: [PATCH] Replace TSLQueue with std::deque where possible Also added std::mutex for each new std::deque. cSendPkts is left as a TSLQueue because it needs to support fast removal from the middle of the data structure (mainly because the queued packets per ConnectionData has an imposed limit of packets to hold). --- src/UDPC_Defines.hpp | 9 ++- src/UDPConnection.cpp | 146 ++++++++++++++++++++++++------------------ 2 files changed, 91 insertions(+), 64 deletions(-) diff --git a/src/UDPC_Defines.hpp b/src/UDPC_Defines.hpp index aa41356..2114cbc 100644 --- a/src/UDPC_Defines.hpp +++ b/src/UDPC_Defines.hpp @@ -235,12 +235,15 @@ public: std::unordered_map idMap; std::unordered_set deletionMap; std::unordered_set peerPKWhitelist; - TSLQueue receivedPkts; + std::deque receivedPkts; + std::mutex receivedPktsMutex; TSLQueue cSendPkts; // handled internally - TSLQueue internalEvents; + std::deque internalEvents; + std::mutex internalEventsMutex; // handled via interface, if isReceivingEvents is true - TSLQueue externalEvents; + std::deque externalEvents; + std::mutex externalEventsMutex; std::default_random_engine rng_engine; diff --git a/src/UDPConnection.cpp b/src/UDPConnection.cpp index 3f100b3..1c5a6a4 100644 --- a/src/UDPConnection.cpp +++ b/src/UDPConnection.cpp @@ -279,10 +279,12 @@ void UDPC::Context::update_impl() { lastUpdated = now; // handle internalEvents - do { - auto optE = internalEvents.top_and_pop(); - if(optE) { - switch(optE->type) { + { + std::lock_guard intEvLock(internalEventsMutex); + while(!internalEvents.empty()) { + auto event = internalEvents.front(); + internalEvents.pop_front(); + switch(event.type) { case UDPC_ET_REQUEST_CONNECT: { unsigned char *sk = nullptr; @@ -294,11 +296,11 @@ void UDPC::Context::update_impl() { UDPC::ConnectionData newCon( false, this, - optE->conId.addr, - optE->conId.scope_id, - optE->conId.port, + event.conId.addr, + event.conId.scope_id, + event.conId.port, #ifdef UDPC_LIBSODIUM_ENABLED - flags.test(2) && optE->v.enableLibSodium != 0, + flags.test(2) && event.v.enableLibSodium != 0, sk, pk); #else false, @@ -309,52 +311,52 @@ void UDPC::Context::update_impl() { UDPC_LoggingType::UDPC_ERROR, "Failed to init ConnectionData instance (libsodium " "init fail) while client establishing connection with ", - UDPC_atostr((UDPC_HContext)this, optE->conId.addr), + UDPC_atostr((UDPC_HContext)this, event.conId.addr), " port ", - optE->conId.port); + event.conId.port); continue; } newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT; - std::lock_guard lock(conMapMutex); - if(conMap.find(optE->conId) == conMap.end()) { + std::lock_guard conMapLock(conMapMutex); + if(conMap.find(event.conId) == conMap.end()) { conMap.insert(std::make_pair( - optE->conId, + event.conId, std::move(newCon))); - auto addrConIter = addrConMap.find(optE->conId.addr); + auto addrConIter = addrConMap.find(event.conId.addr); if(addrConIter == addrConMap.end()) { auto insertResult = addrConMap.insert(std::make_pair( - optE->conId.addr, + event.conId.addr, std::unordered_set{})); assert(insertResult.second && "new connection insert into addrConMap must not fail"); addrConIter = insertResult.first; } - addrConIter->second.insert(optE->conId); + addrConIter->second.insert(event.conId); UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_INFO, "Client initiating connection to ", - UDPC_atostr((UDPC_HContext)this, optE->conId.addr), + UDPC_atostr((UDPC_HContext)this, event.conId.addr), " port ", - optE->conId.port, + event.conId.port, " ..."); } else { UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_WARNING, "Client initiate connection, already connected to peer ", - UDPC_atostr((UDPC_HContext)this, optE->conId.addr), + UDPC_atostr((UDPC_HContext)this, event.conId.addr), " port ", - optE->conId.port); + event.conId.port); } } break; case UDPC_ET_REQUEST_DISCONNECT: { - std::lock_guard lock(conMapMutex); - if(optE->v.dropAllWithAddr != 0) { + std::lock_guard conMapLock(conMapMutex); + if(event.v.dropAllWithAddr != 0) { // drop all connections with same address - auto addrConIter = addrConMap.find(optE->conId.addr); + auto addrConIter = addrConMap.find(event.conId.addr); if(addrConIter != addrConMap.end()) { for(auto identIter = addrConIter->second.begin(); identIter != addrConIter->second.end(); @@ -367,7 +369,7 @@ void UDPC::Context::update_impl() { } } else { // drop only specific connection with addr and port - auto iter = conMap.find(optE->conId); + auto iter = conMap.find(event.conId); if(iter != conMap.end()) { deletionMap.insert(iter->first); } @@ -379,12 +381,12 @@ void UDPC::Context::update_impl() { break; } } - } while(!internalEvents.empty()); + } { // check timed out, check good/bad mode with rtt, remove timed out std::vector removed; - std::lock_guard lock(conMapMutex); + std::lock_guard conMapLock(conMapMutex); for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) { temp_dt_fs = now - iter->second.received; if(temp_dt_fs >= UDPC::CONNECTION_TIMEOUT) { @@ -415,7 +417,8 @@ void UDPC::Context::update_impl() { } iter->second.toggledTimer = std::chrono::steady_clock::duration::zero(); if(isReceivingEvents.load()) { - externalEvents.push(UDPC_Event{ + std::lock_guard extEvLock(externalEventsMutex); + externalEvents.push_back(UDPC_Event{ UDPC_ET_BAD_MODE, iter->first, false}); } } else if(iter->second.flags.test(1)) { @@ -441,7 +444,8 @@ void UDPC::Context::update_impl() { iter->second.port); iter->second.flags.set(1); if(isReceivingEvents.load()) { - externalEvents.push(UDPC_Event{ + std::lock_guard extEvLock(externalEventsMutex); + externalEvents.push_back(UDPC_Event{ UDPC_ET_GOOD_MODE, iter->first, false}); } } @@ -484,10 +488,12 @@ void UDPC::Context::update_impl() { } if(isReceivingEvents.load()) { if(flags.test(1) && cIter->second.flags.test(3)) { - externalEvents.push(UDPC_Event{ + std::lock_guard extEvLock(externalEventsMutex); + externalEvents.push_back(UDPC_Event{ UDPC_ET_FAIL_CONNECT, *iter, false}); } else { - externalEvents.push(UDPC_Event{ + std::lock_guard extEvLock(externalEventsMutex); + externalEvents.push_back(UDPC_Event{ UDPC_ET_DISCONNECTED, *iter, false}); } } @@ -504,7 +510,7 @@ void UDPC::Context::update_impl() { while(true) { auto next = sendIter.current(); if(next) { - std::lock_guard lock(conMapMutex); + std::lock_guard conMapLock(conMapMutex); auto iter = conMap.find(next->receiver); if(iter != conMap.end()) { if(iter->second.sendPkts.size() >= UDPC_QUEUED_PKTS_MAX_SIZE) { @@ -558,7 +564,7 @@ void UDPC::Context::update_impl() { // update send (only if triggerSend flag is set) { - std::lock_guard lock(conMapMutex); + std::lock_guard conMapLock(conMapMutex); for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) { auto delIter = deletionMap.find(iter->first); if(!iter->second.flags.test(0) && delIter == deletionMap.end()) { @@ -1030,7 +1036,7 @@ void UDPC::Context::update_impl() { // remove queued for deletion for(auto delIter = deletionMap.begin(); delIter != deletionMap.end(); ++delIter) { - std::lock_guard lock(conMapMutex); + std::lock_guard conMapLock(conMapMutex); auto iter = conMap.find(*delIter); if(iter != conMap.end()) { if(iter->second.flags.test(4)) { @@ -1045,10 +1051,12 @@ void UDPC::Context::update_impl() { } if(isReceivingEvents.load()) { if(flags.test(1) && iter->second.flags.test(3)) { - externalEvents.push(UDPC_Event{ + std::lock_guard extEvLock(externalEventsMutex); + externalEvents.push_back(UDPC_Event{ UDPC_ET_FAIL_CONNECT, iter->first, false}); } else { - externalEvents.push(UDPC_Event{ + std::lock_guard extEvLock(externalEventsMutex); + externalEvents.push_back(UDPC_Event{ UDPC_ET_DISCONNECTED, iter->first, false}); } } @@ -1181,7 +1189,7 @@ void UDPC::Context::update_impl() { if(isConnect && !isPing) { // is connect packet and is accepting new connections - std::lock_guard lock(conMapMutex); + std::lock_guard conMapLock(conMapMutex); if(!flags.test(1) && conMap.find(identifier) == conMap.end() && isAcceptNewConnections.load()) { @@ -1245,7 +1253,7 @@ void UDPC::Context::update_impl() { recvBuf + UDPC_MIN_HEADER_SIZE + 4, crypto_sign_PUBLICKEYBYTES); { - std::lock_guard lock(peerPKWhitelistMutex); + std::lock_guard pkWhitelistLock(peerPKWhitelistMutex); if(!peerPKWhitelist.empty() && peerPKWhitelist.find(UDPC::PKContainer(newConnection.peer_pk)) == peerPKWhitelist.end()) { UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_WARNING, "peer_pk is not in whitelist, not establishing " @@ -1309,7 +1317,8 @@ void UDPC::Context::update_impl() { } addrConIter->second.insert(identifier); if(isReceivingEvents.load()) { - externalEvents.push(UDPC_Event{ + std::lock_guard extEvLock(externalEventsMutex); + externalEvents.push_back(UDPC_Event{ UDPC_ET_CONNECTED, identifier, false}); @@ -1357,7 +1366,7 @@ void UDPC::Context::update_impl() { recvBuf + UDPC_MIN_HEADER_SIZE + 4, crypto_sign_PUBLICKEYBYTES); { - std::lock_guard lock(peerPKWhitelistMutex); + std::lock_guard pkWhitelistLock(peerPKWhitelistMutex); if(!peerPKWhitelist.empty() && peerPKWhitelist.find(UDPC::PKContainer(iter->second.peer_pk)) == peerPKWhitelist.end()) { UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_WARNING, "peer_pk is not in whitelist, not establishing " @@ -1404,7 +1413,8 @@ void UDPC::Context::update_impl() { flags.test(2) && iter->second.flags.test(6) ? ", libsodium enabled" : ", libsodium disabled"); if(isReceivingEvents.load()) { - externalEvents.push(UDPC_Event{ + std::lock_guard extEvLock(externalEventsMutex); + externalEvents.push_back(UDPC_Event{ UDPC_ET_CONNECTED, identifier, false}); @@ -1413,7 +1423,7 @@ void UDPC::Context::update_impl() { return; } - std::lock_guard lock(conMapMutex); + std::lock_guard conMapLock(conMapMutex); auto iter = conMap.find(identifier); if(iter == conMap.end() || iter->second.flags.test(3) || !iter->second.flags.test(4) || iter->second.id != conID) { @@ -1481,7 +1491,8 @@ void UDPC::Context::update_impl() { } } if(isReceivingEvents.load()) { - externalEvents.push(UDPC_Event{ + std::lock_guard extEvLock(externalEventsMutex); + externalEvents.push_back(UDPC_Event{ UDPC_ET_DISCONNECTED, identifier, false}); } conMap.erase(conIter); @@ -1642,7 +1653,8 @@ void UDPC::Context::update_impl() { recPktInfo.receiver.port = ntohs(socketInfo.sin6_port); recPktInfo.rtt = durationToMS(iter->second.rtt); - receivedPkts.push(recPktInfo); + std::lock_guard receivedPktsLock(receivedPktsMutex); + receivedPkts.push_back(recPktInfo); } else if(pktType == 1 && bytes > (int)UDPC_LSFULL_HEADER_SIZE) { UDPC_PacketInfo recPktInfo = UDPC::get_empty_pinfo(); recPktInfo.dataSize = bytes - UDPC_LSFULL_HEADER_SIZE; @@ -1660,7 +1672,8 @@ void UDPC::Context::update_impl() { recPktInfo.receiver.port = ntohs(socketInfo.sin6_port); recPktInfo.rtt = durationToMS(iter->second.rtt); - receivedPkts.push(recPktInfo); + std::lock_guard receivedPktsLock(receivedPktsMutex); + receivedPkts.push_back(recPktInfo); } else { UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_VERBOSE, @@ -2099,7 +2112,8 @@ void UDPC_client_initiate_connection( } #endif - c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_CONNECT, connectionId, enableLibSodium}); + std::lock_guard intEvLock(c->internalEventsMutex); + c->internalEvents.push_back(UDPC_Event{UDPC_ET_REQUEST_CONNECT, connectionId, enableLibSodium}); } void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId, @@ -2140,7 +2154,7 @@ unsigned long UDPC_get_queued_size(UDPC_HContext ctx, UDPC_ConnectionId id, int return 0; } - std::lock_guard lock(c->conMapMutex); + std::lock_guard conMapLock(c->conMapMutex); auto iter = c->conMap.find(id); if(iter != c->conMap.end()) { if(exists) { @@ -2173,7 +2187,8 @@ void UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, int return; } - c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_DISCONNECT, connectionId, dropAllWithAddr}); + std::lock_guard intEvLock(c->internalEventsMutex); + c->internalEvents.push_back(UDPC_Event{UDPC_ET_REQUEST_DISCONNECT, connectionId, dropAllWithAddr}); return; } @@ -2183,7 +2198,7 @@ int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId) { return 0; } - std::lock_guard lock(c->conMapMutex); + std::lock_guard conMapLock(c->conMapMutex); return c->conMap.find(connectionId) == c->conMap.end() ? 0 : 1; } @@ -2194,7 +2209,7 @@ UDPC_ConnectionId* UDPC_get_list_connected(UDPC_HContext ctx, unsigned int *size return nullptr; } - std::lock_guard lock(c->conMapMutex); + std::lock_guard conMapLock(c->conMapMutex); if(c->conMap.empty()) { if(size) { @@ -2280,11 +2295,15 @@ UDPC_Event UDPC_get_event(UDPC_HContext ctx, unsigned long *remaining) { return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0}; } - auto optE = c->externalEvents.top_and_pop_and_rsize(remaining); - if(optE) { - return *optE; - } else { + std::lock_guard extEvLock(c->externalEventsMutex); + if(c->externalEvents.empty()) { + if(remaining) { *remaining = 0; } return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0}; + } else { + auto event = c->externalEvents.front(); + c->externalEvents.pop_front(); + if(remaining) { *remaining = c->externalEvents.size(); } + return event; } } @@ -2294,11 +2313,16 @@ UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned long *remaining) { return UDPC::get_empty_pinfo(); } - auto opt_pinfo = c->receivedPkts.top_and_pop_and_rsize(remaining); - if(opt_pinfo) { - return *opt_pinfo; + std::lock_guard receivedPktsLock(c->receivedPktsMutex); + if(c->receivedPkts.empty()) { + if(remaining) { *remaining = 0; } + return UDPC::get_empty_pinfo(); + } else { + auto pinfo = c->receivedPkts.front(); + c->receivedPkts.pop_front(); + if(remaining) { *remaining = c->receivedPkts.size(); } + return pinfo; } - return UDPC::get_empty_pinfo(); } int UDPC_set_libsodium_keys(UDPC_HContext ctx, unsigned char *sk, unsigned char *pk) { @@ -2340,7 +2364,7 @@ int UDPC_add_whitelist_pk(UDPC_HContext ctx, unsigned char *pk) { return 0; } - std::lock_guard lock(c->peerPKWhitelistMutex); + std::lock_guard pkWhitelistLock(c->peerPKWhitelistMutex); auto result = c->peerPKWhitelist.insert(UDPC::PKContainer(pk)); if(result.second) { return c->peerPKWhitelist.size(); @@ -2354,7 +2378,7 @@ int UDPC_has_whitelist_pk(UDPC_HContext ctx, unsigned char *pk) { return 0; } - std::lock_guard lock(c->peerPKWhitelistMutex); + std::lock_guard pkWhitelistLock(c->peerPKWhitelistMutex); if(c->peerPKWhitelist.find(UDPC::PKContainer(pk)) != c->peerPKWhitelist.end()) { return 1; } @@ -2367,7 +2391,7 @@ int UDPC_remove_whitelist_pk(UDPC_HContext ctx, unsigned char *pk) { return 0; } - std::lock_guard lock(c->peerPKWhitelistMutex); + std::lock_guard pkWhitelistLock(c->peerPKWhitelistMutex); if(c->peerPKWhitelist.erase(UDPC::PKContainer(pk)) != 0) { return 1; } @@ -2380,7 +2404,7 @@ int UDPC_clear_whitelist(UDPC_HContext ctx) { return 0; } - std::lock_guard lock(c->peerPKWhitelistMutex); + std::lock_guard pkWhitelistLock(c->peerPKWhitelistMutex); c->peerPKWhitelist.clear(); return 1; }