Replace TSLQueue with std::deque where possible

Also added std::mutex for each new std::deque. cSendPkts is left as a
TSLQueue because it needs to support fast removal from the middle of the
data structure (mainly because the queued packets per ConnectionData has
an imposed limit of packets to hold).
This commit is contained in:
Stephen Seo 2020-01-08 19:55:12 +09:00
parent e66e6c7b74
commit 4c48dbb0cf
2 changed files with 91 additions and 64 deletions

View file

@ -235,12 +235,15 @@ public:
std::unordered_map<uint32_t, UDPC_ConnectionId> idMap;
std::unordered_set<UDPC_ConnectionId, ConnectionIdHasher> deletionMap;
std::unordered_set<PKContainer, PKContainer> peerPKWhitelist;
TSLQueue<UDPC_PacketInfo> receivedPkts;
std::deque<UDPC_PacketInfo> receivedPkts;
std::mutex receivedPktsMutex;
TSLQueue<UDPC_PacketInfo> cSendPkts;
// handled internally
TSLQueue<UDPC_Event> internalEvents;
std::deque<UDPC_Event> internalEvents;
std::mutex internalEventsMutex;
// handled via interface, if isReceivingEvents is true
TSLQueue<UDPC_Event> externalEvents;
std::deque<UDPC_Event> externalEvents;
std::mutex externalEventsMutex;
std::default_random_engine rng_engine;

View file

@ -279,10 +279,12 @@ void UDPC::Context::update_impl() {
lastUpdated = now;
// handle internalEvents
do {
auto optE = internalEvents.top_and_pop();
if(optE) {
switch(optE->type) {
{
std::lock_guard<std::mutex> intEvLock(internalEventsMutex);
while(!internalEvents.empty()) {
auto event = internalEvents.front();
internalEvents.pop_front();
switch(event.type) {
case UDPC_ET_REQUEST_CONNECT:
{
unsigned char *sk = nullptr;
@ -294,11 +296,11 @@ void UDPC::Context::update_impl() {
UDPC::ConnectionData newCon(
false,
this,
optE->conId.addr,
optE->conId.scope_id,
optE->conId.port,
event.conId.addr,
event.conId.scope_id,
event.conId.port,
#ifdef UDPC_LIBSODIUM_ENABLED
flags.test(2) && optE->v.enableLibSodium != 0,
flags.test(2) && event.v.enableLibSodium != 0,
sk, pk);
#else
false,
@ -309,52 +311,52 @@ void UDPC::Context::update_impl() {
UDPC_LoggingType::UDPC_ERROR,
"Failed to init ConnectionData instance (libsodium "
"init fail) while client establishing connection with ",
UDPC_atostr((UDPC_HContext)this, optE->conId.addr),
UDPC_atostr((UDPC_HContext)this, event.conId.addr),
" port ",
optE->conId.port);
event.conId.port);
continue;
}
newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT;
std::lock_guard<std::mutex> lock(conMapMutex);
if(conMap.find(optE->conId) == conMap.end()) {
std::lock_guard<std::mutex> conMapLock(conMapMutex);
if(conMap.find(event.conId) == conMap.end()) {
conMap.insert(std::make_pair(
optE->conId,
event.conId,
std::move(newCon)));
auto addrConIter = addrConMap.find(optE->conId.addr);
auto addrConIter = addrConMap.find(event.conId.addr);
if(addrConIter == addrConMap.end()) {
auto insertResult = addrConMap.insert(std::make_pair(
optE->conId.addr,
event.conId.addr,
std::unordered_set<UDPC_ConnectionId, UDPC::ConnectionIdHasher>{}));
assert(insertResult.second &&
"new connection insert into addrConMap must not fail");
addrConIter = insertResult.first;
}
addrConIter->second.insert(optE->conId);
addrConIter->second.insert(event.conId);
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
"Client initiating connection to ",
UDPC_atostr((UDPC_HContext)this, optE->conId.addr),
UDPC_atostr((UDPC_HContext)this, event.conId.addr),
" port ",
optE->conId.port,
event.conId.port,
" ...");
} else {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_WARNING,
"Client initiate connection, already connected to peer ",
UDPC_atostr((UDPC_HContext)this, optE->conId.addr),
UDPC_atostr((UDPC_HContext)this, event.conId.addr),
" port ",
optE->conId.port);
event.conId.port);
}
}
break;
case UDPC_ET_REQUEST_DISCONNECT:
{
std::lock_guard<std::mutex> lock(conMapMutex);
if(optE->v.dropAllWithAddr != 0) {
std::lock_guard<std::mutex> conMapLock(conMapMutex);
if(event.v.dropAllWithAddr != 0) {
// drop all connections with same address
auto addrConIter = addrConMap.find(optE->conId.addr);
auto addrConIter = addrConMap.find(event.conId.addr);
if(addrConIter != addrConMap.end()) {
for(auto identIter = addrConIter->second.begin();
identIter != addrConIter->second.end();
@ -367,7 +369,7 @@ void UDPC::Context::update_impl() {
}
} else {
// drop only specific connection with addr and port
auto iter = conMap.find(optE->conId);
auto iter = conMap.find(event.conId);
if(iter != conMap.end()) {
deletionMap.insert(iter->first);
}
@ -379,12 +381,12 @@ void UDPC::Context::update_impl() {
break;
}
}
} while(!internalEvents.empty());
}
{
// check timed out, check good/bad mode with rtt, remove timed out
std::vector<UDPC_ConnectionId> removed;
std::lock_guard<std::mutex> lock(conMapMutex);
std::lock_guard<std::mutex> conMapLock(conMapMutex);
for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
temp_dt_fs = now - iter->second.received;
if(temp_dt_fs >= UDPC::CONNECTION_TIMEOUT) {
@ -415,7 +417,8 @@ void UDPC::Context::update_impl() {
}
iter->second.toggledTimer = std::chrono::steady_clock::duration::zero();
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
externalEvents.push_back(UDPC_Event{
UDPC_ET_BAD_MODE, iter->first, false});
}
} else if(iter->second.flags.test(1)) {
@ -441,7 +444,8 @@ void UDPC::Context::update_impl() {
iter->second.port);
iter->second.flags.set(1);
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
externalEvents.push_back(UDPC_Event{
UDPC_ET_GOOD_MODE, iter->first, false});
}
}
@ -484,10 +488,12 @@ void UDPC::Context::update_impl() {
}
if(isReceivingEvents.load()) {
if(flags.test(1) && cIter->second.flags.test(3)) {
externalEvents.push(UDPC_Event{
std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
externalEvents.push_back(UDPC_Event{
UDPC_ET_FAIL_CONNECT, *iter, false});
} else {
externalEvents.push(UDPC_Event{
std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
externalEvents.push_back(UDPC_Event{
UDPC_ET_DISCONNECTED, *iter, false});
}
}
@ -504,7 +510,7 @@ void UDPC::Context::update_impl() {
while(true) {
auto next = sendIter.current();
if(next) {
std::lock_guard<std::mutex> lock(conMapMutex);
std::lock_guard<std::mutex> conMapLock(conMapMutex);
auto iter = conMap.find(next->receiver);
if(iter != conMap.end()) {
if(iter->second.sendPkts.size() >= UDPC_QUEUED_PKTS_MAX_SIZE) {
@ -558,7 +564,7 @@ void UDPC::Context::update_impl() {
// update send (only if triggerSend flag is set)
{
std::lock_guard<std::mutex> lock(conMapMutex);
std::lock_guard<std::mutex> conMapLock(conMapMutex);
for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
auto delIter = deletionMap.find(iter->first);
if(!iter->second.flags.test(0) && delIter == deletionMap.end()) {
@ -1030,7 +1036,7 @@ void UDPC::Context::update_impl() {
// remove queued for deletion
for(auto delIter = deletionMap.begin(); delIter != deletionMap.end(); ++delIter) {
std::lock_guard<std::mutex> lock(conMapMutex);
std::lock_guard<std::mutex> conMapLock(conMapMutex);
auto iter = conMap.find(*delIter);
if(iter != conMap.end()) {
if(iter->second.flags.test(4)) {
@ -1045,10 +1051,12 @@ void UDPC::Context::update_impl() {
}
if(isReceivingEvents.load()) {
if(flags.test(1) && iter->second.flags.test(3)) {
externalEvents.push(UDPC_Event{
std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
externalEvents.push_back(UDPC_Event{
UDPC_ET_FAIL_CONNECT, iter->first, false});
} else {
externalEvents.push(UDPC_Event{
std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
externalEvents.push_back(UDPC_Event{
UDPC_ET_DISCONNECTED, iter->first, false});
}
}
@ -1181,7 +1189,7 @@ void UDPC::Context::update_impl() {
if(isConnect && !isPing) {
// is connect packet and is accepting new connections
std::lock_guard<std::mutex> lock(conMapMutex);
std::lock_guard<std::mutex> conMapLock(conMapMutex);
if(!flags.test(1)
&& conMap.find(identifier) == conMap.end()
&& isAcceptNewConnections.load()) {
@ -1245,7 +1253,7 @@ void UDPC::Context::update_impl() {
recvBuf + UDPC_MIN_HEADER_SIZE + 4,
crypto_sign_PUBLICKEYBYTES);
{
std::lock_guard<std::mutex> lock(peerPKWhitelistMutex);
std::lock_guard<std::mutex> pkWhitelistLock(peerPKWhitelistMutex);
if(!peerPKWhitelist.empty() && peerPKWhitelist.find(UDPC::PKContainer(newConnection.peer_pk)) == peerPKWhitelist.end()) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_WARNING,
"peer_pk is not in whitelist, not establishing "
@ -1309,7 +1317,8 @@ void UDPC::Context::update_impl() {
}
addrConIter->second.insert(identifier);
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
externalEvents.push_back(UDPC_Event{
UDPC_ET_CONNECTED,
identifier,
false});
@ -1357,7 +1366,7 @@ void UDPC::Context::update_impl() {
recvBuf + UDPC_MIN_HEADER_SIZE + 4,
crypto_sign_PUBLICKEYBYTES);
{
std::lock_guard<std::mutex> lock(peerPKWhitelistMutex);
std::lock_guard<std::mutex> pkWhitelistLock(peerPKWhitelistMutex);
if(!peerPKWhitelist.empty() && peerPKWhitelist.find(UDPC::PKContainer(iter->second.peer_pk)) == peerPKWhitelist.end()) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_WARNING,
"peer_pk is not in whitelist, not establishing "
@ -1404,7 +1413,8 @@ void UDPC::Context::update_impl() {
flags.test(2) && iter->second.flags.test(6) ?
", libsodium enabled" : ", libsodium disabled");
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
externalEvents.push_back(UDPC_Event{
UDPC_ET_CONNECTED,
identifier,
false});
@ -1413,7 +1423,7 @@ void UDPC::Context::update_impl() {
return;
}
std::lock_guard<std::mutex> lock(conMapMutex);
std::lock_guard<std::mutex> conMapLock(conMapMutex);
auto iter = conMap.find(identifier);
if(iter == conMap.end() || iter->second.flags.test(3)
|| !iter->second.flags.test(4) || iter->second.id != conID) {
@ -1481,7 +1491,8 @@ void UDPC::Context::update_impl() {
}
}
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
externalEvents.push_back(UDPC_Event{
UDPC_ET_DISCONNECTED, identifier, false});
}
conMap.erase(conIter);
@ -1642,7 +1653,8 @@ void UDPC::Context::update_impl() {
recPktInfo.receiver.port = ntohs(socketInfo.sin6_port);
recPktInfo.rtt = durationToMS(iter->second.rtt);
receivedPkts.push(recPktInfo);
std::lock_guard<std::mutex> receivedPktsLock(receivedPktsMutex);
receivedPkts.push_back(recPktInfo);
} else if(pktType == 1 && bytes > (int)UDPC_LSFULL_HEADER_SIZE) {
UDPC_PacketInfo recPktInfo = UDPC::get_empty_pinfo();
recPktInfo.dataSize = bytes - UDPC_LSFULL_HEADER_SIZE;
@ -1660,7 +1672,8 @@ void UDPC::Context::update_impl() {
recPktInfo.receiver.port = ntohs(socketInfo.sin6_port);
recPktInfo.rtt = durationToMS(iter->second.rtt);
receivedPkts.push(recPktInfo);
std::lock_guard<std::mutex> receivedPktsLock(receivedPktsMutex);
receivedPkts.push_back(recPktInfo);
} else {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
@ -2099,7 +2112,8 @@ void UDPC_client_initiate_connection(
}
#endif
c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_CONNECT, connectionId, enableLibSodium});
std::lock_guard<std::mutex> intEvLock(c->internalEventsMutex);
c->internalEvents.push_back(UDPC_Event{UDPC_ET_REQUEST_CONNECT, connectionId, enableLibSodium});
}
void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
@ -2140,7 +2154,7 @@ unsigned long UDPC_get_queued_size(UDPC_HContext ctx, UDPC_ConnectionId id, int
return 0;
}
std::lock_guard<std::mutex> lock(c->conMapMutex);
std::lock_guard<std::mutex> conMapLock(c->conMapMutex);
auto iter = c->conMap.find(id);
if(iter != c->conMap.end()) {
if(exists) {
@ -2173,7 +2187,8 @@ void UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, int
return;
}
c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_DISCONNECT, connectionId, dropAllWithAddr});
std::lock_guard<std::mutex> intEvLock(c->internalEventsMutex);
c->internalEvents.push_back(UDPC_Event{UDPC_ET_REQUEST_DISCONNECT, connectionId, dropAllWithAddr});
return;
}
@ -2183,7 +2198,7 @@ int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
return 0;
}
std::lock_guard<std::mutex> lock(c->conMapMutex);
std::lock_guard<std::mutex> conMapLock(c->conMapMutex);
return c->conMap.find(connectionId) == c->conMap.end() ? 0 : 1;
}
@ -2194,7 +2209,7 @@ UDPC_ConnectionId* UDPC_get_list_connected(UDPC_HContext ctx, unsigned int *size
return nullptr;
}
std::lock_guard<std::mutex> lock(c->conMapMutex);
std::lock_guard<std::mutex> conMapLock(c->conMapMutex);
if(c->conMap.empty()) {
if(size) {
@ -2280,11 +2295,15 @@ UDPC_Event UDPC_get_event(UDPC_HContext ctx, unsigned long *remaining) {
return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0};
}
auto optE = c->externalEvents.top_and_pop_and_rsize(remaining);
if(optE) {
return *optE;
} else {
std::lock_guard<std::mutex> extEvLock(c->externalEventsMutex);
if(c->externalEvents.empty()) {
if(remaining) { *remaining = 0; }
return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0};
} else {
auto event = c->externalEvents.front();
c->externalEvents.pop_front();
if(remaining) { *remaining = c->externalEvents.size(); }
return event;
}
}
@ -2294,11 +2313,16 @@ UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned long *remaining) {
return UDPC::get_empty_pinfo();
}
auto opt_pinfo = c->receivedPkts.top_and_pop_and_rsize(remaining);
if(opt_pinfo) {
return *opt_pinfo;
std::lock_guard<std::mutex> receivedPktsLock(c->receivedPktsMutex);
if(c->receivedPkts.empty()) {
if(remaining) { *remaining = 0; }
return UDPC::get_empty_pinfo();
} else {
auto pinfo = c->receivedPkts.front();
c->receivedPkts.pop_front();
if(remaining) { *remaining = c->receivedPkts.size(); }
return pinfo;
}
return UDPC::get_empty_pinfo();
}
int UDPC_set_libsodium_keys(UDPC_HContext ctx, unsigned char *sk, unsigned char *pk) {
@ -2340,7 +2364,7 @@ int UDPC_add_whitelist_pk(UDPC_HContext ctx, unsigned char *pk) {
return 0;
}
std::lock_guard<std::mutex> lock(c->peerPKWhitelistMutex);
std::lock_guard<std::mutex> pkWhitelistLock(c->peerPKWhitelistMutex);
auto result = c->peerPKWhitelist.insert(UDPC::PKContainer(pk));
if(result.second) {
return c->peerPKWhitelist.size();
@ -2354,7 +2378,7 @@ int UDPC_has_whitelist_pk(UDPC_HContext ctx, unsigned char *pk) {
return 0;
}
std::lock_guard<std::mutex> lock(c->peerPKWhitelistMutex);
std::lock_guard<std::mutex> pkWhitelistLock(c->peerPKWhitelistMutex);
if(c->peerPKWhitelist.find(UDPC::PKContainer(pk)) != c->peerPKWhitelist.end()) {
return 1;
}
@ -2367,7 +2391,7 @@ int UDPC_remove_whitelist_pk(UDPC_HContext ctx, unsigned char *pk) {
return 0;
}
std::lock_guard<std::mutex> lock(c->peerPKWhitelistMutex);
std::lock_guard<std::mutex> pkWhitelistLock(c->peerPKWhitelistMutex);
if(c->peerPKWhitelist.erase(UDPC::PKContainer(pk)) != 0) {
return 1;
}
@ -2380,7 +2404,7 @@ int UDPC_clear_whitelist(UDPC_HContext ctx) {
return 0;
}
std::lock_guard<std::mutex> lock(c->peerPKWhitelistMutex);
std::lock_guard<std::mutex> pkWhitelistLock(c->peerPKWhitelistMutex);
c->peerPKWhitelist.clear();
return 1;
}