lastUpdated = now;
// handle internalEvents
- do {
- auto optE = internalEvents.top_and_pop();
- if(optE) {
- switch(optE->type) {
+ {
+ std::lock_guard<std::mutex> intEvLock(internalEventsMutex);
+ while(!internalEvents.empty()) {
+ auto event = internalEvents.front();
+ internalEvents.pop_front();
+ switch(event.type) {
case UDPC_ET_REQUEST_CONNECT:
{
unsigned char *sk = nullptr;
UDPC::ConnectionData newCon(
false,
this,
- optE->conId.addr,
- optE->conId.scope_id,
- optE->conId.port,
+ event.conId.addr,
+ event.conId.scope_id,
+ event.conId.port,
#ifdef UDPC_LIBSODIUM_ENABLED
- flags.test(2) && optE->v.enableLibSodium != 0,
+ flags.test(2) && event.v.enableLibSodium != 0,
sk, pk);
#else
false,
UDPC_LoggingType::UDPC_ERROR,
"Failed to init ConnectionData instance (libsodium "
"init fail) while client establishing connection with ",
- UDPC_atostr((UDPC_HContext)this, optE->conId.addr),
+ UDPC_atostr((UDPC_HContext)this, event.conId.addr),
" port ",
- optE->conId.port);
+ event.conId.port);
continue;
}
newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT;
- std::lock_guard<std::mutex> lock(conMapMutex);
- if(conMap.find(optE->conId) == conMap.end()) {
+ std::lock_guard<std::mutex> conMapLock(conMapMutex);
+ if(conMap.find(event.conId) == conMap.end()) {
conMap.insert(std::make_pair(
- optE->conId,
+ event.conId,
std::move(newCon)));
- auto addrConIter = addrConMap.find(optE->conId.addr);
+ auto addrConIter = addrConMap.find(event.conId.addr);
if(addrConIter == addrConMap.end()) {
auto insertResult = addrConMap.insert(std::make_pair(
- optE->conId.addr,
+ event.conId.addr,
std::unordered_set<UDPC_ConnectionId, UDPC::ConnectionIdHasher>{}));
assert(insertResult.second &&
"new connection insert into addrConMap must not fail");
addrConIter = insertResult.first;
}
- addrConIter->second.insert(optE->conId);
+ addrConIter->second.insert(event.conId);
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
"Client initiating connection to ",
- UDPC_atostr((UDPC_HContext)this, optE->conId.addr),
+ UDPC_atostr((UDPC_HContext)this, event.conId.addr),
" port ",
- optE->conId.port,
+ event.conId.port,
" ...");
} else {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_WARNING,
"Client initiate connection, already connected to peer ",
- UDPC_atostr((UDPC_HContext)this, optE->conId.addr),
+ UDPC_atostr((UDPC_HContext)this, event.conId.addr),
" port ",
- optE->conId.port);
+ event.conId.port);
}
}
break;
case UDPC_ET_REQUEST_DISCONNECT:
{
- std::lock_guard<std::mutex> lock(conMapMutex);
- if(optE->v.dropAllWithAddr != 0) {
+ std::lock_guard<std::mutex> conMapLock(conMapMutex);
+ if(event.v.dropAllWithAddr != 0) {
// drop all connections with same address
- auto addrConIter = addrConMap.find(optE->conId.addr);
+ auto addrConIter = addrConMap.find(event.conId.addr);
if(addrConIter != addrConMap.end()) {
for(auto identIter = addrConIter->second.begin();
identIter != addrConIter->second.end();
}
} else {
// drop only specific connection with addr and port
- auto iter = conMap.find(optE->conId);
+ auto iter = conMap.find(event.conId);
if(iter != conMap.end()) {
deletionMap.insert(iter->first);
}
break;
}
}
- } while(!internalEvents.empty());
+ }
{
// check timed out, check good/bad mode with rtt, remove timed out
std::vector<UDPC_ConnectionId> removed;
- std::lock_guard<std::mutex> lock(conMapMutex);
+ std::lock_guard<std::mutex> conMapLock(conMapMutex);
for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
temp_dt_fs = now - iter->second.received;
if(temp_dt_fs >= UDPC::CONNECTION_TIMEOUT) {
}
iter->second.toggledTimer = std::chrono::steady_clock::duration::zero();
if(isReceivingEvents.load()) {
- externalEvents.push(UDPC_Event{
+ std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+ externalEvents.push_back(UDPC_Event{
UDPC_ET_BAD_MODE, iter->first, false});
}
} else if(iter->second.flags.test(1)) {
iter->second.port);
iter->second.flags.set(1);
if(isReceivingEvents.load()) {
- externalEvents.push(UDPC_Event{
+ std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+ externalEvents.push_back(UDPC_Event{
UDPC_ET_GOOD_MODE, iter->first, false});
}
}
}
if(isReceivingEvents.load()) {
if(flags.test(1) && cIter->second.flags.test(3)) {
- externalEvents.push(UDPC_Event{
+ std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+ externalEvents.push_back(UDPC_Event{
UDPC_ET_FAIL_CONNECT, *iter, false});
} else {
- externalEvents.push(UDPC_Event{
+ std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+ externalEvents.push_back(UDPC_Event{
UDPC_ET_DISCONNECTED, *iter, false});
}
}
while(true) {
auto next = sendIter.current();
if(next) {
- std::lock_guard<std::mutex> lock(conMapMutex);
+ std::lock_guard<std::mutex> conMapLock(conMapMutex);
auto iter = conMap.find(next->receiver);
if(iter != conMap.end()) {
if(iter->second.sendPkts.size() >= UDPC_QUEUED_PKTS_MAX_SIZE) {
// update send (only if triggerSend flag is set)
{
- std::lock_guard<std::mutex> lock(conMapMutex);
+ std::lock_guard<std::mutex> conMapLock(conMapMutex);
for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
auto delIter = deletionMap.find(iter->first);
if(!iter->second.flags.test(0) && delIter == deletionMap.end()) {
// remove queued for deletion
for(auto delIter = deletionMap.begin(); delIter != deletionMap.end(); ++delIter) {
- std::lock_guard<std::mutex> lock(conMapMutex);
+ std::lock_guard<std::mutex> conMapLock(conMapMutex);
auto iter = conMap.find(*delIter);
if(iter != conMap.end()) {
if(iter->second.flags.test(4)) {
}
if(isReceivingEvents.load()) {
if(flags.test(1) && iter->second.flags.test(3)) {
- externalEvents.push(UDPC_Event{
+ std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+ externalEvents.push_back(UDPC_Event{
UDPC_ET_FAIL_CONNECT, iter->first, false});
} else {
- externalEvents.push(UDPC_Event{
+ std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+ externalEvents.push_back(UDPC_Event{
UDPC_ET_DISCONNECTED, iter->first, false});
}
}
if(isConnect && !isPing) {
// is connect packet and is accepting new connections
- std::lock_guard<std::mutex> lock(conMapMutex);
+ std::lock_guard<std::mutex> conMapLock(conMapMutex);
if(!flags.test(1)
&& conMap.find(identifier) == conMap.end()
&& isAcceptNewConnections.load()) {
recvBuf + UDPC_MIN_HEADER_SIZE + 4,
crypto_sign_PUBLICKEYBYTES);
{
- std::lock_guard<std::mutex> lock(peerPKWhitelistMutex);
+ std::lock_guard<std::mutex> pkWhitelistLock(peerPKWhitelistMutex);
if(!peerPKWhitelist.empty() && peerPKWhitelist.find(UDPC::PKContainer(newConnection.peer_pk)) == peerPKWhitelist.end()) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_WARNING,
"peer_pk is not in whitelist, not establishing "
}
addrConIter->second.insert(identifier);
if(isReceivingEvents.load()) {
- externalEvents.push(UDPC_Event{
+ std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+ externalEvents.push_back(UDPC_Event{
UDPC_ET_CONNECTED,
identifier,
false});
recvBuf + UDPC_MIN_HEADER_SIZE + 4,
crypto_sign_PUBLICKEYBYTES);
{
- std::lock_guard<std::mutex> lock(peerPKWhitelistMutex);
+ std::lock_guard<std::mutex> pkWhitelistLock(peerPKWhitelistMutex);
if(!peerPKWhitelist.empty() && peerPKWhitelist.find(UDPC::PKContainer(iter->second.peer_pk)) == peerPKWhitelist.end()) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_WARNING,
"peer_pk is not in whitelist, not establishing "
flags.test(2) && iter->second.flags.test(6) ?
", libsodium enabled" : ", libsodium disabled");
if(isReceivingEvents.load()) {
- externalEvents.push(UDPC_Event{
+ std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+ externalEvents.push_back(UDPC_Event{
UDPC_ET_CONNECTED,
identifier,
false});
return;
}
- std::lock_guard<std::mutex> lock(conMapMutex);
+ std::lock_guard<std::mutex> conMapLock(conMapMutex);
auto iter = conMap.find(identifier);
if(iter == conMap.end() || iter->second.flags.test(3)
|| !iter->second.flags.test(4) || iter->second.id != conID) {
}
}
if(isReceivingEvents.load()) {
- externalEvents.push(UDPC_Event{
+ std::lock_guard<std::mutex> extEvLock(externalEventsMutex);
+ externalEvents.push_back(UDPC_Event{
UDPC_ET_DISCONNECTED, identifier, false});
}
conMap.erase(conIter);
recPktInfo.receiver.port = ntohs(socketInfo.sin6_port);
recPktInfo.rtt = durationToMS(iter->second.rtt);
- receivedPkts.push(recPktInfo);
+ std::lock_guard<std::mutex> receivedPktsLock(receivedPktsMutex);
+ receivedPkts.push_back(recPktInfo);
} else if(pktType == 1 && bytes > (int)UDPC_LSFULL_HEADER_SIZE) {
UDPC_PacketInfo recPktInfo = UDPC::get_empty_pinfo();
recPktInfo.dataSize = bytes - UDPC_LSFULL_HEADER_SIZE;
recPktInfo.receiver.port = ntohs(socketInfo.sin6_port);
recPktInfo.rtt = durationToMS(iter->second.rtt);
- receivedPkts.push(recPktInfo);
+ std::lock_guard<std::mutex> receivedPktsLock(receivedPktsMutex);
+ receivedPkts.push_back(recPktInfo);
} else {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
}
#endif
- c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_CONNECT, connectionId, enableLibSodium});
+ std::lock_guard<std::mutex> intEvLock(c->internalEventsMutex);
+ c->internalEvents.push_back(UDPC_Event{UDPC_ET_REQUEST_CONNECT, connectionId, enableLibSodium});
}
void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
return 0;
}
- std::lock_guard<std::mutex> lock(c->conMapMutex);
+ std::lock_guard<std::mutex> conMapLock(c->conMapMutex);
auto iter = c->conMap.find(id);
if(iter != c->conMap.end()) {
if(exists) {
return;
}
- c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_DISCONNECT, connectionId, dropAllWithAddr});
+ std::lock_guard<std::mutex> intEvLock(c->internalEventsMutex);
+ c->internalEvents.push_back(UDPC_Event{UDPC_ET_REQUEST_DISCONNECT, connectionId, dropAllWithAddr});
return;
}
return 0;
}
- std::lock_guard<std::mutex> lock(c->conMapMutex);
+ std::lock_guard<std::mutex> conMapLock(c->conMapMutex);
return c->conMap.find(connectionId) == c->conMap.end() ? 0 : 1;
}
return nullptr;
}
- std::lock_guard<std::mutex> lock(c->conMapMutex);
+ std::lock_guard<std::mutex> conMapLock(c->conMapMutex);
if(c->conMap.empty()) {
if(size) {
return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0};
}
- auto optE = c->externalEvents.top_and_pop_and_rsize(remaining);
- if(optE) {
- return *optE;
- } else {
+ std::lock_guard<std::mutex> extEvLock(c->externalEventsMutex);
+ if(c->externalEvents.empty()) {
+ if(remaining) { *remaining = 0; }
return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0};
+ } else {
+ auto event = c->externalEvents.front();
+ c->externalEvents.pop_front();
+ if(remaining) { *remaining = c->externalEvents.size(); }
+ return event;
}
}
return UDPC::get_empty_pinfo();
}
- auto opt_pinfo = c->receivedPkts.top_and_pop_and_rsize(remaining);
- if(opt_pinfo) {
- return *opt_pinfo;
+ std::lock_guard<std::mutex> receivedPktsLock(c->receivedPktsMutex);
+ if(c->receivedPkts.empty()) {
+ if(remaining) { *remaining = 0; }
+ return UDPC::get_empty_pinfo();
+ } else {
+ auto pinfo = c->receivedPkts.front();
+ c->receivedPkts.pop_front();
+ if(remaining) { *remaining = c->receivedPkts.size(); }
+ return pinfo;
}
- return UDPC::get_empty_pinfo();
}
int UDPC_set_libsodium_keys(UDPC_HContext ctx, unsigned char *sk, unsigned char *pk) {
return 0;
}
- std::lock_guard<std::mutex> lock(c->peerPKWhitelistMutex);
+ std::lock_guard<std::mutex> pkWhitelistLock(c->peerPKWhitelistMutex);
auto result = c->peerPKWhitelist.insert(UDPC::PKContainer(pk));
if(result.second) {
return c->peerPKWhitelist.size();
return 0;
}
- std::lock_guard<std::mutex> lock(c->peerPKWhitelistMutex);
+ std::lock_guard<std::mutex> pkWhitelistLock(c->peerPKWhitelistMutex);
if(c->peerPKWhitelist.find(UDPC::PKContainer(pk)) != c->peerPKWhitelist.end()) {
return 1;
}
return 0;
}
- std::lock_guard<std::mutex> lock(c->peerPKWhitelistMutex);
+ std::lock_guard<std::mutex> pkWhitelistLock(c->peerPKWhitelistMutex);
if(c->peerPKWhitelist.erase(UDPC::PKContainer(pk)) != 0) {
return 1;
}
return 0;
}
- std::lock_guard<std::mutex> lock(c->peerPKWhitelistMutex);
+ std::lock_guard<std::mutex> pkWhitelistLock(c->peerPKWhitelistMutex);
c->peerPKWhitelist.clear();
return 1;
}