Refactor locking mutex during update

This commit is contained in:
Stephen Seo 2020-01-05 15:05:23 +09:00
parent 7996bd5c36
commit 5c8480e5bc

View file

@ -317,6 +317,8 @@ void UDPC::Context::update_impl() {
} }
newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT; newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT;
std::lock_guard<std::mutex> lock(conMapMutex);
if(conMap.find(optE->conId) == conMap.end()) { if(conMap.find(optE->conId) == conMap.end()) {
conMap.insert(std::make_pair( conMap.insert(std::make_pair(
optE->conId, optE->conId,
@ -349,6 +351,8 @@ void UDPC::Context::update_impl() {
} }
break; break;
case UDPC_ET_REQUEST_DISCONNECT: case UDPC_ET_REQUEST_DISCONNECT:
{
std::lock_guard<std::mutex> lock(conMapMutex);
if(optE->v.dropAllWithAddr != 0) { if(optE->v.dropAllWithAddr != 0) {
// drop all connections with same address // drop all connections with same address
auto addrConIter = addrConMap.find(optE->conId.addr); auto addrConIter = addrConMap.find(optE->conId.addr);
@ -369,6 +373,7 @@ void UDPC::Context::update_impl() {
deletionMap.insert(iter->first); deletionMap.insert(iter->first);
} }
} }
}
break; break;
default: default:
assert(!"internalEvents got invalid type"); assert(!"internalEvents got invalid type");
@ -380,6 +385,7 @@ void UDPC::Context::update_impl() {
{ {
// check timed out, check good/bad mode with rtt, remove timed out // check timed out, check good/bad mode with rtt, remove timed out
std::vector<UDPC_ConnectionId> removed; std::vector<UDPC_ConnectionId> removed;
std::lock_guard<std::mutex> lock(conMapMutex);
for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) { for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
temp_dt_fs = now - iter->second.received; temp_dt_fs = now - iter->second.received;
if(temp_dt_fs >= UDPC::CONNECTION_TIMEOUT) { if(temp_dt_fs >= UDPC::CONNECTION_TIMEOUT) {
@ -499,6 +505,7 @@ void UDPC::Context::update_impl() {
while(true) { while(true) {
auto next = sendIter.current(); auto next = sendIter.current();
if(next) { if(next) {
std::lock_guard<std::mutex> lock(conMapMutex);
auto iter = conMap.find(next->receiver); auto iter = conMap.find(next->receiver);
if(iter != conMap.end()) { if(iter != conMap.end()) {
if(iter->second.sendPkts.size() >= UDPC_QUEUED_PKTS_MAX_SIZE) { if(iter->second.sendPkts.size() >= UDPC_QUEUED_PKTS_MAX_SIZE) {
@ -551,217 +558,27 @@ void UDPC::Context::update_impl() {
} }
// update send (only if triggerSend flag is set) // update send (only if triggerSend flag is set)
for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) { {
auto delIter = deletionMap.find(iter->first); std::lock_guard<std::mutex> lock(conMapMutex);
if(!iter->second.flags.test(0) && delIter == deletionMap.end()) { for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
continue; auto delIter = deletionMap.find(iter->first);
} else if(delIter != deletionMap.end()) { if(!iter->second.flags.test(0) && delIter == deletionMap.end()) {
if(iter->second.flags.test(3)) {
// not initiated connection yet, no need to send disconnect pkt
continue; continue;
} } else if(delIter != deletionMap.end()) {
unsigned int sendSize = 0; if(iter->second.flags.test(3)) {
std::unique_ptr<char[]> buf; // not initiated connection yet, no need to send disconnect pkt
if(flags.test(2) && iter->second.flags.test(6)) {
sendSize = UDPC_LSFULL_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
} else {
sendSize = UDPC_NSFULL_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
}
UDPC::preparePacket(
buf.get(),
protocolID,
iter->second.id,
iter->second.rseq,
iter->second.ack,
&iter->second.lseq,
0x3);
if(flags.test(2) && iter->second.flags.test(6)) {
#ifdef UDPC_LIBSODIUM_ENABLED
unsigned char sig[crypto_sign_BYTES];
std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
if(crypto_sign_detached(
sig, nullptr,
(unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE,
iter->second.sk) != 0) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"Failed to sign packet for peer ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port ",
iter->second.port);
continue; continue;
} }
std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
#else
assert(!"libsodium disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"libsodium is disabled, cannot send packet");
continue;
#endif
}
UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6;
std::memcpy(
UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
UDPC_IPV6_ADDR_SUB(iter->first.addr),
16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id;
long int sentBytes = sendto(
socketHandle,
buf.get(),
sendSize,
0,
(struct sockaddr*) &destinationInfo,
sizeof(UDPC_IPV6_SOCKADDR_TYPE));
if(sentBytes != sendSize) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR,
"Failed to send disconnect packet to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ",
iter->second.port);
continue;
}
continue;
}
// clear triggerSend flag
iter->second.flags.reset(0);
if(iter->second.flags.test(3)) {
if(flags.test(1)) {
// is initiating connection to server
auto initDT = now - iter->second.sent;
if(initDT < UDPC::INIT_PKT_INTERVAL_DT) {
continue;
}
iter->second.sent = now;
std::unique_ptr<char[]> buf;
unsigned int sendSize = 0; unsigned int sendSize = 0;
if(flags.test(2) && iter->second.flags.test(6)) {
#ifdef UDPC_LIBSODIUM_ENABLED
sendSize = UDPC_CCL_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]);
// set type 1
*((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(1);
// set public key
std::memcpy(
buf.get() + UDPC_MIN_HEADER_SIZE + 4,
iter->second.pk,
crypto_sign_PUBLICKEYBYTES);
// set verify message
std::time_t time = std::time(nullptr);
if(time <= 0) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"Failed to get current epoch time");
continue;
}
uint64_t timeInt = time;
# ifndef NDEBUG
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG,
"Client set up verification epoch time \"",
timeInt, "\"");
# endif
UDPC::be64((char*)&timeInt);
iter->second.verifyMessage =
std::unique_ptr<char[]>(new char[8]);
std::memcpy(
iter->second.verifyMessage.get(),
&timeInt,
8);
std::memcpy(
buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES,
&timeInt,
8);
#else
assert(!"libsodium is disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"libsodium is disabled, cannot send packet");
continue;
#endif
} else {
sendSize = UDPC_CON_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((uint32_t*)(buf.get() + 20)) = 0;
}
UDPC::preparePacket(
buf.get(),
protocolID,
0,
0,
0xFFFFFFFF,
nullptr,
0x1);
UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6;
std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id;
long int sentBytes = sendto(
socketHandle,
buf.get(),
sendSize,
0,
(struct sockaddr*) &destinationInfo,
sizeof(UDPC_IPV6_SOCKADDR_TYPE));
if(sentBytes != sendSize) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR,
"Failed to send packet to initiate connection to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ",
iter->second.port);
continue;
} else {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_INFO, "Sent initiate connection to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ", iter->second.port,
flags.test(2) && iter->second.flags.test(6) ?
", libsodium enabled" : ", libsodium disabled");
}
} else {
// is server, initiate connection to client
iter->second.flags.reset(3);
iter->second.sent = now;
std::unique_ptr<char[]> buf; std::unique_ptr<char[]> buf;
unsigned int sendSize = 0;
if(flags.test(2) && iter->second.flags.test(6)) { if(flags.test(2) && iter->second.flags.test(6)) {
#ifdef UDPC_LIBSODIUM_ENABLED sendSize = UDPC_LSFULL_HEADER_SIZE;
sendSize = UDPC_CSR_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]); buf = std::unique_ptr<char[]>(new char[sendSize]);
// set type *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
*((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(2);
// set pubkey
std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 4,
iter->second.pk,
crypto_sign_PUBLICKEYBYTES);
// set detached sig
assert(iter->second.verifyMessage &&
"Detached sig in verifyMessage must exist");
std::memcpy(
buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES,
iter->second.verifyMessage.get(),
crypto_sign_BYTES);
#else
assert(!"libsodium disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"libsodium is disabled, cannot send packet");
continue;
#endif
} else { } else {
sendSize = UDPC_CON_HEADER_SIZE; sendSize = UDPC_NSFULL_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]); buf = std::unique_ptr<char[]>(new char[sendSize]);
*((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0; *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
} }
UDPC::preparePacket( UDPC::preparePacket(
buf.get(), buf.get(),
@ -770,11 +587,37 @@ void UDPC::Context::update_impl() {
iter->second.rseq, iter->second.rseq,
iter->second.ack, iter->second.ack,
&iter->second.lseq, &iter->second.lseq,
0x1); 0x3);
if(flags.test(2) && iter->second.flags.test(6)) {
#ifdef UDPC_LIBSODIUM_ENABLED
unsigned char sig[crypto_sign_BYTES];
std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
if(crypto_sign_detached(
sig, nullptr,
(unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE,
iter->second.sk) != 0) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"Failed to sign packet for peer ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port ",
iter->second.port);
continue;
}
std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
#else
assert(!"libsodium disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"libsodium is disabled, cannot send packet");
continue;
#endif
}
UDPC_IPV6_SOCKADDR_TYPE destinationInfo; UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6; destinationInfo.sin6_family = AF_INET6;
std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16); std::memcpy(
UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
UDPC_IPV6_ADDR_SUB(iter->first.addr),
16);
destinationInfo.sin6_port = htons(iter->second.port); destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0; destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id; destinationInfo.sin6_scope_id = iter->first.scope_id;
@ -788,239 +631,407 @@ void UDPC::Context::update_impl() {
if(sentBytes != sendSize) { if(sentBytes != sendSize) {
UDPC_CHECK_LOG(this, UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR, UDPC_LoggingType::UDPC_ERROR,
"Failed to send packet to initiate connection to ", "Failed to send disconnect packet to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr), UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ", ", port = ",
iter->second.port); iter->second.port);
continue; continue;
} }
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG,
"Sent init pkt to client ",
UDPC_atostr((UDPC_HContext)this, destinationInfo.sin6_addr),
", port ", iter->second.port);
}
continue;
}
// Not initiating connection, send as normal on current connection
if(iter->second.sendPkts.empty() && iter->second.priorityPkts.empty()) {
// nothing in queues, send heartbeat packet
auto sentDT = now - iter->second.sent;
if(sentDT < UDPC::HEARTBEAT_PKT_INTERVAL_DT) {
continue; continue;
} }
unsigned int sendSize = 0; // clear triggerSend flag
std::unique_ptr<char[]> buf; iter->second.flags.reset(0);
if(flags.test(2) && iter->second.flags.test(6)) {
sendSize = UDPC_LSFULL_HEADER_SIZE; if(iter->second.flags.test(3)) {
buf = std::unique_ptr<char[]>(new char[sendSize]); if(flags.test(1)) {
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1; // is initiating connection to server
} else { auto initDT = now - iter->second.sent;
sendSize = UDPC_NSFULL_HEADER_SIZE; if(initDT < UDPC::INIT_PKT_INTERVAL_DT) {
buf = std::unique_ptr<char[]>(new char[sendSize]); continue;
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0; }
} iter->second.sent = now;
UDPC::preparePacket(
buf.get(), std::unique_ptr<char[]> buf;
protocolID, unsigned int sendSize = 0;
iter->second.id, if(flags.test(2) && iter->second.flags.test(6)) {
iter->second.rseq,
iter->second.ack,
&iter->second.lseq,
0);
if(flags.test(2) && iter->second.flags.test(6)) {
#ifdef UDPC_LIBSODIUM_ENABLED #ifdef UDPC_LIBSODIUM_ENABLED
unsigned char sig[crypto_sign_BYTES]; sendSize = UDPC_CCL_HEADER_SIZE;
std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES); buf = std::unique_ptr<char[]>(new char[sendSize]);
if(crypto_sign_detached( // set type 1
sig, nullptr, *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(1);
(unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE, // set public key
iter->second.sk) != 0) { std::memcpy(
buf.get() + UDPC_MIN_HEADER_SIZE + 4,
iter->second.pk,
crypto_sign_PUBLICKEYBYTES);
// set verify message
std::time_t time = std::time(nullptr);
if(time <= 0) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"Failed to get current epoch time");
continue;
}
uint64_t timeInt = time;
# ifndef NDEBUG
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG,
"Client set up verification epoch time \"",
timeInt, "\"");
# endif
UDPC::be64((char*)&timeInt);
iter->second.verifyMessage =
std::unique_ptr<char[]>(new char[8]);
std::memcpy(
iter->second.verifyMessage.get(),
&timeInt,
8);
std::memcpy(
buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES,
&timeInt,
8);
#else
assert(!"libsodium is disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"libsodium is disabled, cannot send packet");
continue;
#endif
} else {
sendSize = UDPC_CON_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((uint32_t*)(buf.get() + 20)) = 0;
}
UDPC::preparePacket(
buf.get(),
protocolID,
0,
0,
0xFFFFFFFF,
nullptr,
0x1);
UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6;
std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id;
long int sentBytes = sendto(
socketHandle,
buf.get(),
sendSize,
0,
(struct sockaddr*) &destinationInfo,
sizeof(UDPC_IPV6_SOCKADDR_TYPE));
if(sentBytes != sendSize) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR,
"Failed to send packet to initiate connection to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ",
iter->second.port);
continue;
} else {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_INFO, "Sent initiate connection to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ", iter->second.port,
flags.test(2) && iter->second.flags.test(6) ?
", libsodium enabled" : ", libsodium disabled");
}
} else {
// is server, initiate connection to client
iter->second.flags.reset(3);
iter->second.sent = now;
std::unique_ptr<char[]> buf;
unsigned int sendSize = 0;
if(flags.test(2) && iter->second.flags.test(6)) {
#ifdef UDPC_LIBSODIUM_ENABLED
sendSize = UDPC_CSR_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]);
// set type
*((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(2);
// set pubkey
std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 4,
iter->second.pk,
crypto_sign_PUBLICKEYBYTES);
// set detached sig
assert(iter->second.verifyMessage &&
"Detached sig in verifyMessage must exist");
std::memcpy(
buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES,
iter->second.verifyMessage.get(),
crypto_sign_BYTES);
#else
assert(!"libsodium disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"libsodium is disabled, cannot send packet");
continue;
#endif
} else {
sendSize = UDPC_CON_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
}
UDPC::preparePacket(
buf.get(),
protocolID,
iter->second.id,
iter->second.rseq,
iter->second.ack,
&iter->second.lseq,
0x1);
UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6;
std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id;
long int sentBytes = sendto(
socketHandle,
buf.get(),
sendSize,
0,
(struct sockaddr*) &destinationInfo,
sizeof(UDPC_IPV6_SOCKADDR_TYPE));
if(sentBytes != sendSize) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR,
"Failed to send packet to initiate connection to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ",
iter->second.port);
continue;
}
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG,
"Sent init pkt to client ",
UDPC_atostr((UDPC_HContext)this, destinationInfo.sin6_addr),
", port ", iter->second.port);
}
continue;
}
// Not initiating connection, send as normal on current connection
if(iter->second.sendPkts.empty() && iter->second.priorityPkts.empty()) {
// nothing in queues, send heartbeat packet
auto sentDT = now - iter->second.sent;
if(sentDT < UDPC::HEARTBEAT_PKT_INTERVAL_DT) {
continue;
}
unsigned int sendSize = 0;
std::unique_ptr<char[]> buf;
if(flags.test(2) && iter->second.flags.test(6)) {
sendSize = UDPC_LSFULL_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
} else {
sendSize = UDPC_NSFULL_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
}
UDPC::preparePacket(
buf.get(),
protocolID,
iter->second.id,
iter->second.rseq,
iter->second.ack,
&iter->second.lseq,
0);
if(flags.test(2) && iter->second.flags.test(6)) {
#ifdef UDPC_LIBSODIUM_ENABLED
unsigned char sig[crypto_sign_BYTES];
std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
if(crypto_sign_detached(
sig, nullptr,
(unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE,
iter->second.sk) != 0) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"Failed to sign packet for peer ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port ",
iter->second.port);
continue;
}
std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
#else
assert(!"libsodium disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"Failed to sign packet for peer ", "libsodium is disabled, cannot send packet");
continue;
#endif
}
UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6;
std::memcpy(
UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
UDPC_IPV6_ADDR_SUB(iter->first.addr),
16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id;
long int sentBytes = sendto(
socketHandle,
buf.get(),
sendSize,
0,
(struct sockaddr*) &destinationInfo,
sizeof(UDPC_IPV6_SOCKADDR_TYPE));
if(sentBytes != sendSize) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR,
"Failed to send heartbeat packet to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr), UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port ", ", port = ",
iter->second.port); iter->second.port);
continue; continue;
} }
std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
#else
assert(!"libsodium disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"libsodium is disabled, cannot send packet");
continue;
#endif
}
UDPC_IPV6_SOCKADDR_TYPE destinationInfo; UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
destinationInfo.sin6_family = AF_INET6; pInfo.flags = 0x4;
std::memcpy( pInfo.sender.addr = in6addr_loopback;
UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), pInfo.receiver.addr = iter->first.addr;
UDPC_IPV6_ADDR_SUB(iter->first.addr), pInfo.sender.port = ntohs(socketInfo.sin6_port);
16); pInfo.receiver.port = iter->second.port;
destinationInfo.sin6_port = htons(iter->second.port); *((uint32_t*)(pInfo.data + 8)) = htonl(iter->second.lseq - 1);
destinationInfo.sin6_flowinfo = 0; pInfo.data[UDPC_MIN_HEADER_SIZE] = flags.test(2) && iter->second.flags.test(6) ? 1 : 0;
destinationInfo.sin6_scope_id = iter->first.scope_id;
long int sentBytes = sendto(
socketHandle,
buf.get(),
sendSize,
0,
(struct sockaddr*) &destinationInfo,
sizeof(UDPC_IPV6_SOCKADDR_TYPE));
if(sentBytes != sendSize) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR,
"Failed to send heartbeat packet to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ",
iter->second.port);
continue;
}
UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo(); iter->second.sentPkts.push_back(std::move(pInfo));
pInfo.flags = 0x4; iter->second.cleanupSentPkts();
pInfo.sender.addr = in6addr_loopback;
pInfo.receiver.addr = iter->first.addr;
pInfo.sender.port = ntohs(socketInfo.sin6_port);
pInfo.receiver.port = iter->second.port;
*((uint32_t*)(pInfo.data + 8)) = htonl(iter->second.lseq - 1);
pInfo.data[UDPC_MIN_HEADER_SIZE] = flags.test(2) && iter->second.flags.test(6) ? 1 : 0;
iter->second.sentPkts.push_back(std::move(pInfo)); // store other pkt info
iter->second.cleanupSentPkts(); UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
sentPktInfo->id = iter->second.lseq - 1;
// store other pkt info iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
sentPktInfo->id = iter->second.lseq - 1;
iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
} else {
// sendPkts or priorityPkts not empty
UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
bool isResending = false;
if(!iter->second.priorityPkts.empty()) {
pInfo = iter->second.priorityPkts.front();
iter->second.priorityPkts.pop_front();
isResending = true;
} else { } else {
pInfo = iter->second.sendPkts.front(); // sendPkts or priorityPkts not empty
iter->second.sendPkts.pop_front(); UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
} bool isResending = false;
if(!iter->second.priorityPkts.empty()) {
pInfo = iter->second.priorityPkts.front();
iter->second.priorityPkts.pop_front();
isResending = true;
} else {
pInfo = iter->second.sendPkts.front();
iter->second.sendPkts.pop_front();
}
std::unique_ptr<char[]> buf; std::unique_ptr<char[]> buf;
unsigned int sendSize = 0; unsigned int sendSize = 0;
if(flags.test(2) && iter->second.flags.test(6)) { if(flags.test(2) && iter->second.flags.test(6)) {
sendSize = UDPC_LSFULL_HEADER_SIZE + pInfo.dataSize; sendSize = UDPC_LSFULL_HEADER_SIZE + pInfo.dataSize;
buf = std::unique_ptr<char[]>(new char[sendSize]); buf = std::unique_ptr<char[]>(new char[sendSize]);
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1; *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
} else { } else {
sendSize = UDPC_NSFULL_HEADER_SIZE + pInfo.dataSize; sendSize = UDPC_NSFULL_HEADER_SIZE + pInfo.dataSize;
buf = std::unique_ptr<char[]>(new char[sendSize]); buf = std::unique_ptr<char[]>(new char[sendSize]);
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0; *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
} }
UDPC::preparePacket( UDPC::preparePacket(
buf.get(), buf.get(),
protocolID, protocolID,
iter->second.id, iter->second.id,
iter->second.rseq, iter->second.rseq,
iter->second.ack, iter->second.ack,
&iter->second.lseq, &iter->second.lseq,
(pInfo.flags & 0x4) | (isResending ? 0x8 : 0)); (pInfo.flags & 0x4) | (isResending ? 0x8 : 0));
if(flags.test(2) && iter->second.flags.test(6)) { if(flags.test(2) && iter->second.flags.test(6)) {
#ifdef UDPC_LIBSODIUM_ENABLED #ifdef UDPC_LIBSODIUM_ENABLED
unsigned char sig[crypto_sign_BYTES]; unsigned char sig[crypto_sign_BYTES];
std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES); std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
std::memcpy(buf.get() + UDPC_LSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize); std::memcpy(buf.get() + UDPC_LSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize);
if(crypto_sign_detached( if(crypto_sign_detached(
sig, nullptr, sig, nullptr,
(unsigned char*)buf.get(), sendSize, (unsigned char*)buf.get(), sendSize,
iter->second.sk) != 0) { iter->second.sk) != 0) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"Failed to sign packet for peer ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port ",
iter->second.port);
continue;
}
std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
#else
assert(!"libsodium disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"Failed to sign packet for peer ", "libsodium is disabled, cannot send packet");
continue;
#endif
} else {
std::memcpy(buf.get() + UDPC_NSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize);
}
UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6;
std::memcpy(
UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
UDPC_IPV6_ADDR_SUB(iter->first.addr),
16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id;
long int sentBytes = sendto(
socketHandle,
buf.get(),
sendSize,
0,
(struct sockaddr*) &destinationInfo,
sizeof(UDPC_IPV6_SOCKADDR_TYPE));
if(sentBytes != sendSize) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR,
"Failed to send packet to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr), UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port ", ", port = ",
iter->second.port); iter->second.port);
continue; continue;
} }
std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
#else if((pInfo.flags & 0x4) == 0) {
assert(!"libsodium disabled, invalid state"); // is check-received, store data in case packet gets lost
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo();
"libsodium is disabled, cannot send packet"); std::memcpy(sentPInfo.data, buf.get(), sendSize);
continue; sentPInfo.flags = 0;
#endif sentPInfo.dataSize = sendSize;
} else { sentPInfo.sender.addr = in6addr_loopback;
std::memcpy(buf.get() + UDPC_NSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize); sentPInfo.receiver.addr = iter->first.addr;
sentPInfo.sender.port = ntohs(socketInfo.sin6_port);
sentPInfo.receiver.port = iter->second.port;
iter->second.sentPkts.push_back(std::move(sentPInfo));
iter->second.cleanupSentPkts();
} else {
// is not check-received, only id stored in data array
UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo();
sentPInfo.flags = 0x4;
sentPInfo.dataSize = 0;
sentPInfo.sender.addr = in6addr_loopback;
sentPInfo.receiver.addr = iter->first.addr;
sentPInfo.sender.port = ntohs(socketInfo.sin6_port);
sentPInfo.receiver.port = iter->second.port;
*((uint32_t*)(sentPInfo.data + 8)) = htonl(iter->second.lseq - 1);
iter->second.sentPkts.push_back(std::move(sentPInfo));
iter->second.cleanupSentPkts();
}
// store other pkt info
UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
sentPktInfo->id = iter->second.lseq - 1;
iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
} }
iter->second.sent = now;
UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6;
std::memcpy(
UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
UDPC_IPV6_ADDR_SUB(iter->first.addr),
16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id;
long int sentBytes = sendto(
socketHandle,
buf.get(),
sendSize,
0,
(struct sockaddr*) &destinationInfo,
sizeof(UDPC_IPV6_SOCKADDR_TYPE));
if(sentBytes != sendSize) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR,
"Failed to send packet to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ",
iter->second.port);
continue;
}
if((pInfo.flags & 0x4) == 0) {
// is check-received, store data in case packet gets lost
UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo();
std::memcpy(sentPInfo.data, buf.get(), sendSize);
sentPInfo.flags = 0;
sentPInfo.dataSize = sendSize;
sentPInfo.sender.addr = in6addr_loopback;
sentPInfo.receiver.addr = iter->first.addr;
sentPInfo.sender.port = ntohs(socketInfo.sin6_port);
sentPInfo.receiver.port = iter->second.port;
iter->second.sentPkts.push_back(std::move(sentPInfo));
iter->second.cleanupSentPkts();
} else {
// is not check-received, only id stored in data array
UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo();
sentPInfo.flags = 0x4;
sentPInfo.dataSize = 0;
sentPInfo.sender.addr = in6addr_loopback;
sentPInfo.receiver.addr = iter->first.addr;
sentPInfo.sender.port = ntohs(socketInfo.sin6_port);
sentPInfo.receiver.port = iter->second.port;
*((uint32_t*)(sentPInfo.data + 8)) = htonl(iter->second.lseq - 1);
iter->second.sentPkts.push_back(std::move(sentPInfo));
iter->second.cleanupSentPkts();
}
// store other pkt info
UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
sentPktInfo->id = iter->second.lseq - 1;
iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
} }
iter->second.sent = now;
} }
// remove queued for deletion // remove queued for deletion
for(auto delIter = deletionMap.begin(); delIter != deletionMap.end(); ++delIter) { for(auto delIter = deletionMap.begin(); delIter != deletionMap.end(); ++delIter) {
std::lock_guard<std::mutex> lock(conMapMutex);
auto iter = conMap.find(*delIter); auto iter = conMap.find(*delIter);
if(iter != conMap.end()) { if(iter != conMap.end()) {
if(iter->second.flags.test(4)) { if(iter->second.flags.test(4)) {
@ -1166,6 +1177,7 @@ void UDPC::Context::update_impl() {
if(isConnect && !isPing) { if(isConnect && !isPing) {
// is connect packet and is accepting new connections // is connect packet and is accepting new connections
std::lock_guard<std::mutex> lock(conMapMutex);
if(!flags.test(1) if(!flags.test(1)
&& conMap.find(identifier) == conMap.end() && conMap.find(identifier) == conMap.end()
&& isAcceptNewConnections.load()) { && isAcceptNewConnections.load()) {
@ -1397,6 +1409,7 @@ void UDPC::Context::update_impl() {
return; return;
} }
std::lock_guard<std::mutex> lock(conMapMutex);
auto iter = conMap.find(identifier); auto iter = conMap.find(identifier);
if(iter == conMap.end() || iter->second.flags.test(3) if(iter == conMap.end() || iter->second.flags.test(3)
|| !iter->second.flags.test(4) || iter->second.id != conID) { || !iter->second.flags.test(4) || iter->second.id != conID) {
@ -1780,10 +1793,7 @@ void UDPC::threadedUpdate(Context *ctx) {
decltype(now) nextNow; decltype(now) nextNow;
while(ctx->threadRunning.load()) { while(ctx->threadRunning.load()) {
now = std::chrono::steady_clock::now(); now = std::chrono::steady_clock::now();
{ ctx->update_impl();
std::lock_guard<std::mutex> lock(ctx->conMapMutex);
ctx->update_impl();
}
nextNow = std::chrono::steady_clock::now(); nextNow = std::chrono::steady_clock::now();
std::this_thread::sleep_for(ctx->threadedSleepTime - (nextNow - now)); std::this_thread::sleep_for(ctx->threadedSleepTime - (nextNow - now));
} }
@ -2066,7 +2076,6 @@ void UDPC_update(UDPC_HContext ctx) {
return; return;
} }
std::lock_guard<std::mutex> lock(c->conMapMutex);
c->update_impl(); c->update_impl();
} }