}
newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT;
+
+ std::lock_guard<std::mutex> lock(conMapMutex);
if(conMap.find(optE->conId) == conMap.end()) {
conMap.insert(std::make_pair(
optE->conId,
}
break;
case UDPC_ET_REQUEST_DISCONNECT:
+ {
+ std::lock_guard<std::mutex> lock(conMapMutex);
if(optE->v.dropAllWithAddr != 0) {
// drop all connections with same address
auto addrConIter = addrConMap.find(optE->conId.addr);
deletionMap.insert(iter->first);
}
}
+ }
break;
default:
assert(!"internalEvents got invalid type");
{
// check timed out, check good/bad mode with rtt, remove timed out
std::vector<UDPC_ConnectionId> removed;
+ std::lock_guard<std::mutex> lock(conMapMutex);
for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
temp_dt_fs = now - iter->second.received;
if(temp_dt_fs >= UDPC::CONNECTION_TIMEOUT) {
while(true) {
auto next = sendIter.current();
if(next) {
+ std::lock_guard<std::mutex> lock(conMapMutex);
auto iter = conMap.find(next->receiver);
if(iter != conMap.end()) {
if(iter->second.sendPkts.size() >= UDPC_QUEUED_PKTS_MAX_SIZE) {
}
// update send (only if triggerSend flag is set)
- for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
- auto delIter = deletionMap.find(iter->first);
- if(!iter->second.flags.test(0) && delIter == deletionMap.end()) {
- continue;
- } else if(delIter != deletionMap.end()) {
- if(iter->second.flags.test(3)) {
- // not initiated connection yet, no need to send disconnect pkt
+ {
+ std::lock_guard<std::mutex> lock(conMapMutex);
+ for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
+ auto delIter = deletionMap.find(iter->first);
+ if(!iter->second.flags.test(0) && delIter == deletionMap.end()) {
continue;
- }
- unsigned int sendSize = 0;
- std::unique_ptr<char[]> buf;
- if(flags.test(2) && iter->second.flags.test(6)) {
- sendSize = UDPC_LSFULL_HEADER_SIZE;
- buf = std::unique_ptr<char[]>(new char[sendSize]);
- *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
- } else {
- sendSize = UDPC_NSFULL_HEADER_SIZE;
- buf = std::unique_ptr<char[]>(new char[sendSize]);
- *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
- }
- UDPC::preparePacket(
- buf.get(),
- protocolID,
- iter->second.id,
- iter->second.rseq,
- iter->second.ack,
- &iter->second.lseq,
- 0x3);
- if(flags.test(2) && iter->second.flags.test(6)) {
+ } else if(delIter != deletionMap.end()) {
+ if(iter->second.flags.test(3)) {
+ // not initiated connection yet, no need to send disconnect pkt
+ continue;
+ }
+ unsigned int sendSize = 0;
+ std::unique_ptr<char[]> buf;
+ if(flags.test(2) && iter->second.flags.test(6)) {
+ sendSize = UDPC_LSFULL_HEADER_SIZE;
+ buf = std::unique_ptr<char[]>(new char[sendSize]);
+ *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
+ } else {
+ sendSize = UDPC_NSFULL_HEADER_SIZE;
+ buf = std::unique_ptr<char[]>(new char[sendSize]);
+ *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
+ }
+ UDPC::preparePacket(
+ buf.get(),
+ protocolID,
+ iter->second.id,
+ iter->second.rseq,
+ iter->second.ack,
+ &iter->second.lseq,
+ 0x3);
+ if(flags.test(2) && iter->second.flags.test(6)) {
#ifdef UDPC_LIBSODIUM_ENABLED
- unsigned char sig[crypto_sign_BYTES];
- std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
- if(crypto_sign_detached(
- sig, nullptr,
- (unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE,
- iter->second.sk) != 0) {
+ unsigned char sig[crypto_sign_BYTES];
+ std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
+ if(crypto_sign_detached(
+ sig, nullptr,
+ (unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE,
+ iter->second.sk) != 0) {
+ UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
+ "Failed to sign packet for peer ",
+ UDPC_atostr((UDPC_HContext)this, iter->first.addr),
+ ", port ",
+ iter->second.port);
+ continue;
+ }
+ std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
+#else
+ assert(!"libsodium disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
- "Failed to sign packet for peer ",
+ "libsodium is disabled, cannot send packet");
+ continue;
+#endif
+ }
+
+ UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
+ destinationInfo.sin6_family = AF_INET6;
+ std::memcpy(
+ UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
+ UDPC_IPV6_ADDR_SUB(iter->first.addr),
+ 16);
+ destinationInfo.sin6_port = htons(iter->second.port);
+ destinationInfo.sin6_flowinfo = 0;
+ destinationInfo.sin6_scope_id = iter->first.scope_id;
+ long int sentBytes = sendto(
+ socketHandle,
+ buf.get(),
+ sendSize,
+ 0,
+ (struct sockaddr*) &destinationInfo,
+ sizeof(UDPC_IPV6_SOCKADDR_TYPE));
+ if(sentBytes != sendSize) {
+ UDPC_CHECK_LOG(this,
+ UDPC_LoggingType::UDPC_ERROR,
+ "Failed to send disconnect packet to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
- ", port ",
+ ", port = ",
iter->second.port);
continue;
}
- std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
-#else
- assert(!"libsodium disabled, invalid state");
- UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
- "libsodium is disabled, cannot send packet");
continue;
-#endif
}
- UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
- destinationInfo.sin6_family = AF_INET6;
- std::memcpy(
- UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
- UDPC_IPV6_ADDR_SUB(iter->first.addr),
- 16);
- destinationInfo.sin6_port = htons(iter->second.port);
- destinationInfo.sin6_flowinfo = 0;
- destinationInfo.sin6_scope_id = iter->first.scope_id;
- long int sentBytes = sendto(
- socketHandle,
- buf.get(),
- sendSize,
- 0,
- (struct sockaddr*) &destinationInfo,
- sizeof(UDPC_IPV6_SOCKADDR_TYPE));
- if(sentBytes != sendSize) {
- UDPC_CHECK_LOG(this,
- UDPC_LoggingType::UDPC_ERROR,
- "Failed to send disconnect packet to ",
- UDPC_atostr((UDPC_HContext)this, iter->first.addr),
- ", port = ",
- iter->second.port);
+ // clear triggerSend flag
+ iter->second.flags.reset(0);
+
+ if(iter->second.flags.test(3)) {
+ if(flags.test(1)) {
+ // is initiating connection to server
+ auto initDT = now - iter->second.sent;
+ if(initDT < UDPC::INIT_PKT_INTERVAL_DT) {
+ continue;
+ }
+ iter->second.sent = now;
+
+ std::unique_ptr<char[]> buf;
+ unsigned int sendSize = 0;
+ if(flags.test(2) && iter->second.flags.test(6)) {
+#ifdef UDPC_LIBSODIUM_ENABLED
+ sendSize = UDPC_CCL_HEADER_SIZE;
+ buf = std::unique_ptr<char[]>(new char[sendSize]);
+ // set type 1
+ *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(1);
+ // set public key
+ std::memcpy(
+ buf.get() + UDPC_MIN_HEADER_SIZE + 4,
+ iter->second.pk,
+ crypto_sign_PUBLICKEYBYTES);
+ // set verify message
+ std::time_t time = std::time(nullptr);
+ if(time <= 0) {
+ UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
+ "Failed to get current epoch time");
+ continue;
+ }
+ uint64_t timeInt = time;
+# ifndef NDEBUG
+ UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG,
+ "Client set up verification epoch time \"",
+ timeInt, "\"");
+# endif
+ UDPC::be64((char*)&timeInt);
+ iter->second.verifyMessage =
+ std::unique_ptr<char[]>(new char[8]);
+ std::memcpy(
+ iter->second.verifyMessage.get(),
+ &timeInt,
+ 8);
+ std::memcpy(
+ buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES,
+ &timeInt,
+ 8);
+#else
+ assert(!"libsodium is disabled, invalid state");
+ UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
+ "libsodium is disabled, cannot send packet");
+ continue;
+#endif
+ } else {
+ sendSize = UDPC_CON_HEADER_SIZE;
+ buf = std::unique_ptr<char[]>(new char[sendSize]);
+ *((uint32_t*)(buf.get() + 20)) = 0;
+ }
+ UDPC::preparePacket(
+ buf.get(),
+ protocolID,
+ 0,
+ 0,
+ 0xFFFFFFFF,
+ nullptr,
+ 0x1);
+
+ UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
+ destinationInfo.sin6_family = AF_INET6;
+ std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
+ destinationInfo.sin6_port = htons(iter->second.port);
+ destinationInfo.sin6_flowinfo = 0;
+ destinationInfo.sin6_scope_id = iter->first.scope_id;
+ long int sentBytes = sendto(
+ socketHandle,
+ buf.get(),
+ sendSize,
+ 0,
+ (struct sockaddr*) &destinationInfo,
+ sizeof(UDPC_IPV6_SOCKADDR_TYPE));
+ if(sentBytes != sendSize) {
+ UDPC_CHECK_LOG(this,
+ UDPC_LoggingType::UDPC_ERROR,
+ "Failed to send packet to initiate connection to ",
+ UDPC_atostr((UDPC_HContext)this, iter->first.addr),
+ ", port = ",
+ iter->second.port);
+ continue;
+ } else {
+ UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_INFO, "Sent initiate connection to ",
+ UDPC_atostr((UDPC_HContext)this, iter->first.addr),
+ ", port = ", iter->second.port,
+ flags.test(2) && iter->second.flags.test(6) ?
+ ", libsodium enabled" : ", libsodium disabled");
+ }
+ } else {
+ // is server, initiate connection to client
+ iter->second.flags.reset(3);
+ iter->second.sent = now;
+
+ std::unique_ptr<char[]> buf;
+ unsigned int sendSize = 0;
+ if(flags.test(2) && iter->second.flags.test(6)) {
+#ifdef UDPC_LIBSODIUM_ENABLED
+ sendSize = UDPC_CSR_HEADER_SIZE;
+ buf = std::unique_ptr<char[]>(new char[sendSize]);
+ // set type
+ *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(2);
+ // set pubkey
+ std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 4,
+ iter->second.pk,
+ crypto_sign_PUBLICKEYBYTES);
+ // set detached sig
+ assert(iter->second.verifyMessage &&
+ "Detached sig in verifyMessage must exist");
+ std::memcpy(
+ buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES,
+ iter->second.verifyMessage.get(),
+ crypto_sign_BYTES);
+#else
+ assert(!"libsodium disabled, invalid state");
+ UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
+ "libsodium is disabled, cannot send packet");
+ continue;
+#endif
+ } else {
+ sendSize = UDPC_CON_HEADER_SIZE;
+ buf = std::unique_ptr<char[]>(new char[sendSize]);
+ *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
+ }
+ UDPC::preparePacket(
+ buf.get(),
+ protocolID,
+ iter->second.id,
+ iter->second.rseq,
+ iter->second.ack,
+ &iter->second.lseq,
+ 0x1);
+
+ UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
+ destinationInfo.sin6_family = AF_INET6;
+ std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
+ destinationInfo.sin6_port = htons(iter->second.port);
+ destinationInfo.sin6_flowinfo = 0;
+ destinationInfo.sin6_scope_id = iter->first.scope_id;
+ long int sentBytes = sendto(
+ socketHandle,
+ buf.get(),
+ sendSize,
+ 0,
+ (struct sockaddr*) &destinationInfo,
+ sizeof(UDPC_IPV6_SOCKADDR_TYPE));
+ if(sentBytes != sendSize) {
+ UDPC_CHECK_LOG(this,
+ UDPC_LoggingType::UDPC_ERROR,
+ "Failed to send packet to initiate connection to ",
+ UDPC_atostr((UDPC_HContext)this, iter->first.addr),
+ ", port = ",
+ iter->second.port);
+ continue;
+ }
+ UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG,
+ "Sent init pkt to client ",
+ UDPC_atostr((UDPC_HContext)this, destinationInfo.sin6_addr),
+ ", port ", iter->second.port);
+ }
continue;
}
- continue;
- }
-
- // clear triggerSend flag
- iter->second.flags.reset(0);
- if(iter->second.flags.test(3)) {
- if(flags.test(1)) {
- // is initiating connection to server
- auto initDT = now - iter->second.sent;
- if(initDT < UDPC::INIT_PKT_INTERVAL_DT) {
+ // Not initiating connection, send as normal on current connection
+ if(iter->second.sendPkts.empty() && iter->second.priorityPkts.empty()) {
+ // nothing in queues, send heartbeat packet
+ auto sentDT = now - iter->second.sent;
+ if(sentDT < UDPC::HEARTBEAT_PKT_INTERVAL_DT) {
continue;
}
- iter->second.sent = now;
- std::unique_ptr<char[]> buf;
unsigned int sendSize = 0;
+ std::unique_ptr<char[]> buf;
if(flags.test(2) && iter->second.flags.test(6)) {
-#ifdef UDPC_LIBSODIUM_ENABLED
- sendSize = UDPC_CCL_HEADER_SIZE;
+ sendSize = UDPC_LSFULL_HEADER_SIZE;
+ buf = std::unique_ptr<char[]>(new char[sendSize]);
+ *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
+ } else {
+ sendSize = UDPC_NSFULL_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]);
- // set type 1
- *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(1);
- // set public key
- std::memcpy(
- buf.get() + UDPC_MIN_HEADER_SIZE + 4,
- iter->second.pk,
- crypto_sign_PUBLICKEYBYTES);
- // set verify message
- std::time_t time = std::time(nullptr);
- if(time <= 0) {
+ *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
+ }
+ UDPC::preparePacket(
+ buf.get(),
+ protocolID,
+ iter->second.id,
+ iter->second.rseq,
+ iter->second.ack,
+ &iter->second.lseq,
+ 0);
+ if(flags.test(2) && iter->second.flags.test(6)) {
+#ifdef UDPC_LIBSODIUM_ENABLED
+ unsigned char sig[crypto_sign_BYTES];
+ std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
+ if(crypto_sign_detached(
+ sig, nullptr,
+ (unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE,
+ iter->second.sk) != 0) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
- "Failed to get current epoch time");
+ "Failed to sign packet for peer ",
+ UDPC_atostr((UDPC_HContext)this, iter->first.addr),
+ ", port ",
+ iter->second.port);
continue;
}
- uint64_t timeInt = time;
-# ifndef NDEBUG
- UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG,
- "Client set up verification epoch time \"",
- timeInt, "\"");
-# endif
- UDPC::be64((char*)&timeInt);
- iter->second.verifyMessage =
- std::unique_ptr<char[]>(new char[8]);
- std::memcpy(
- iter->second.verifyMessage.get(),
- &timeInt,
- 8);
- std::memcpy(
- buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES,
- &timeInt,
- 8);
+ std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
#else
- assert(!"libsodium is disabled, invalid state");
+ assert(!"libsodium disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"libsodium is disabled, cannot send packet");
continue;
#endif
- } else {
- sendSize = UDPC_CON_HEADER_SIZE;
- buf = std::unique_ptr<char[]>(new char[sendSize]);
- *((uint32_t*)(buf.get() + 20)) = 0;
}
- UDPC::preparePacket(
- buf.get(),
- protocolID,
- 0,
- 0,
- 0xFFFFFFFF,
- nullptr,
- 0x1);
UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6;
- std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
+ std::memcpy(
+ UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
+ UDPC_IPV6_ADDR_SUB(iter->first.addr),
+ 16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id;
if(sentBytes != sendSize) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR,
- "Failed to send packet to initiate connection to ",
+ "Failed to send heartbeat packet to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ",
iter->second.port);
continue;
- } else {
- UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_INFO, "Sent initiate connection to ",
- UDPC_atostr((UDPC_HContext)this, iter->first.addr),
- ", port = ", iter->second.port,
- flags.test(2) && iter->second.flags.test(6) ?
- ", libsodium enabled" : ", libsodium disabled");
}
+
+ UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
+ pInfo.flags = 0x4;
+ pInfo.sender.addr = in6addr_loopback;
+ pInfo.receiver.addr = iter->first.addr;
+ pInfo.sender.port = ntohs(socketInfo.sin6_port);
+ pInfo.receiver.port = iter->second.port;
+ *((uint32_t*)(pInfo.data + 8)) = htonl(iter->second.lseq - 1);
+ pInfo.data[UDPC_MIN_HEADER_SIZE] = flags.test(2) && iter->second.flags.test(6) ? 1 : 0;
+
+ iter->second.sentPkts.push_back(std::move(pInfo));
+ iter->second.cleanupSentPkts();
+
+ // store other pkt info
+ UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
+ sentPktInfo->id = iter->second.lseq - 1;
+ iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
} else {
- // is server, initiate connection to client
- iter->second.flags.reset(3);
- iter->second.sent = now;
+ // sendPkts or priorityPkts not empty
+ UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
+ bool isResending = false;
+ if(!iter->second.priorityPkts.empty()) {
+ pInfo = iter->second.priorityPkts.front();
+ iter->second.priorityPkts.pop_front();
+ isResending = true;
+ } else {
+ pInfo = iter->second.sendPkts.front();
+ iter->second.sendPkts.pop_front();
+ }
std::unique_ptr<char[]> buf;
unsigned int sendSize = 0;
if(flags.test(2) && iter->second.flags.test(6)) {
-#ifdef UDPC_LIBSODIUM_ENABLED
- sendSize = UDPC_CSR_HEADER_SIZE;
+ sendSize = UDPC_LSFULL_HEADER_SIZE + pInfo.dataSize;
buf = std::unique_ptr<char[]>(new char[sendSize]);
- // set type
- *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(2);
- // set pubkey
- std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 4,
- iter->second.pk,
- crypto_sign_PUBLICKEYBYTES);
- // set detached sig
- assert(iter->second.verifyMessage &&
- "Detached sig in verifyMessage must exist");
- std::memcpy(
- buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES,
- iter->second.verifyMessage.get(),
- crypto_sign_BYTES);
-#else
- assert(!"libsodium disabled, invalid state");
- UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
- "libsodium is disabled, cannot send packet");
- continue;
-#endif
+ *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
} else {
- sendSize = UDPC_CON_HEADER_SIZE;
+ sendSize = UDPC_NSFULL_HEADER_SIZE + pInfo.dataSize;
buf = std::unique_ptr<char[]>(new char[sendSize]);
- *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
+ *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
}
+
UDPC::preparePacket(
buf.get(),
protocolID,
iter->second.rseq,
iter->second.ack,
&iter->second.lseq,
- 0x1);
+ (pInfo.flags & 0x4) | (isResending ? 0x8 : 0));
+
+ if(flags.test(2) && iter->second.flags.test(6)) {
+#ifdef UDPC_LIBSODIUM_ENABLED
+ unsigned char sig[crypto_sign_BYTES];
+ std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
+ std::memcpy(buf.get() + UDPC_LSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize);
+ if(crypto_sign_detached(
+ sig, nullptr,
+ (unsigned char*)buf.get(), sendSize,
+ iter->second.sk) != 0) {
+ UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
+ "Failed to sign packet for peer ",
+ UDPC_atostr((UDPC_HContext)this, iter->first.addr),
+ ", port ",
+ iter->second.port);
+ continue;
+ }
+ std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
+#else
+ assert(!"libsodium disabled, invalid state");
+ UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
+ "libsodium is disabled, cannot send packet");
+ continue;
+#endif
+ } else {
+ std::memcpy(buf.get() + UDPC_NSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize);
+ }
+
UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6;
- std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
+ std::memcpy(
+ UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
+ UDPC_IPV6_ADDR_SUB(iter->first.addr),
+ 16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id;
if(sentBytes != sendSize) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR,
- "Failed to send packet to initiate connection to ",
+ "Failed to send packet to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ",
iter->second.port);
continue;
}
- UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG,
- "Sent init pkt to client ",
- UDPC_atostr((UDPC_HContext)this, destinationInfo.sin6_addr),
- ", port ", iter->second.port);
- }
- continue;
- }
-
- // Not initiating connection, send as normal on current connection
- if(iter->second.sendPkts.empty() && iter->second.priorityPkts.empty()) {
- // nothing in queues, send heartbeat packet
- auto sentDT = now - iter->second.sent;
- if(sentDT < UDPC::HEARTBEAT_PKT_INTERVAL_DT) {
- continue;
- }
-
- unsigned int sendSize = 0;
- std::unique_ptr<char[]> buf;
- if(flags.test(2) && iter->second.flags.test(6)) {
- sendSize = UDPC_LSFULL_HEADER_SIZE;
- buf = std::unique_ptr<char[]>(new char[sendSize]);
- *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
- } else {
- sendSize = UDPC_NSFULL_HEADER_SIZE;
- buf = std::unique_ptr<char[]>(new char[sendSize]);
- *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
- }
- UDPC::preparePacket(
- buf.get(),
- protocolID,
- iter->second.id,
- iter->second.rseq,
- iter->second.ack,
- &iter->second.lseq,
- 0);
- if(flags.test(2) && iter->second.flags.test(6)) {
-#ifdef UDPC_LIBSODIUM_ENABLED
- unsigned char sig[crypto_sign_BYTES];
- std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
- if(crypto_sign_detached(
- sig, nullptr,
- (unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE,
- iter->second.sk) != 0) {
- UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
- "Failed to sign packet for peer ",
- UDPC_atostr((UDPC_HContext)this, iter->first.addr),
- ", port ",
- iter->second.port);
- continue;
- }
- std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
-#else
- assert(!"libsodium disabled, invalid state");
- UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
- "libsodium is disabled, cannot send packet");
- continue;
-#endif
- }
-
- UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
- destinationInfo.sin6_family = AF_INET6;
- std::memcpy(
- UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
- UDPC_IPV6_ADDR_SUB(iter->first.addr),
- 16);
- destinationInfo.sin6_port = htons(iter->second.port);
- destinationInfo.sin6_flowinfo = 0;
- destinationInfo.sin6_scope_id = iter->first.scope_id;
- long int sentBytes = sendto(
- socketHandle,
- buf.get(),
- sendSize,
- 0,
- (struct sockaddr*) &destinationInfo,
- sizeof(UDPC_IPV6_SOCKADDR_TYPE));
- if(sentBytes != sendSize) {
- UDPC_CHECK_LOG(this,
- UDPC_LoggingType::UDPC_ERROR,
- "Failed to send heartbeat packet to ",
- UDPC_atostr((UDPC_HContext)this, iter->first.addr),
- ", port = ",
- iter->second.port);
- continue;
- }
-
- UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
- pInfo.flags = 0x4;
- pInfo.sender.addr = in6addr_loopback;
- pInfo.receiver.addr = iter->first.addr;
- pInfo.sender.port = ntohs(socketInfo.sin6_port);
- pInfo.receiver.port = iter->second.port;
- *((uint32_t*)(pInfo.data + 8)) = htonl(iter->second.lseq - 1);
- pInfo.data[UDPC_MIN_HEADER_SIZE] = flags.test(2) && iter->second.flags.test(6) ? 1 : 0;
-
- iter->second.sentPkts.push_back(std::move(pInfo));
- iter->second.cleanupSentPkts();
-
- // store other pkt info
- UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
- sentPktInfo->id = iter->second.lseq - 1;
- iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
- } else {
- // sendPkts or priorityPkts not empty
- UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
- bool isResending = false;
- if(!iter->second.priorityPkts.empty()) {
- pInfo = iter->second.priorityPkts.front();
- iter->second.priorityPkts.pop_front();
- isResending = true;
- } else {
- pInfo = iter->second.sendPkts.front();
- iter->second.sendPkts.pop_front();
- }
-
- std::unique_ptr<char[]> buf;
- unsigned int sendSize = 0;
- if(flags.test(2) && iter->second.flags.test(6)) {
- sendSize = UDPC_LSFULL_HEADER_SIZE + pInfo.dataSize;
- buf = std::unique_ptr<char[]>(new char[sendSize]);
- *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
- } else {
- sendSize = UDPC_NSFULL_HEADER_SIZE + pInfo.dataSize;
- buf = std::unique_ptr<char[]>(new char[sendSize]);
- *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
- }
-
- UDPC::preparePacket(
- buf.get(),
- protocolID,
- iter->second.id,
- iter->second.rseq,
- iter->second.ack,
- &iter->second.lseq,
- (pInfo.flags & 0x4) | (isResending ? 0x8 : 0));
- if(flags.test(2) && iter->second.flags.test(6)) {
-#ifdef UDPC_LIBSODIUM_ENABLED
- unsigned char sig[crypto_sign_BYTES];
- std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
- std::memcpy(buf.get() + UDPC_LSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize);
- if(crypto_sign_detached(
- sig, nullptr,
- (unsigned char*)buf.get(), sendSize,
- iter->second.sk) != 0) {
- UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
- "Failed to sign packet for peer ",
- UDPC_atostr((UDPC_HContext)this, iter->first.addr),
- ", port ",
- iter->second.port);
- continue;
+ if((pInfo.flags & 0x4) == 0) {
+ // is check-received, store data in case packet gets lost
+ UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo();
+ std::memcpy(sentPInfo.data, buf.get(), sendSize);
+ sentPInfo.flags = 0;
+ sentPInfo.dataSize = sendSize;
+ sentPInfo.sender.addr = in6addr_loopback;
+ sentPInfo.receiver.addr = iter->first.addr;
+ sentPInfo.sender.port = ntohs(socketInfo.sin6_port);
+ sentPInfo.receiver.port = iter->second.port;
+
+ iter->second.sentPkts.push_back(std::move(sentPInfo));
+ iter->second.cleanupSentPkts();
+ } else {
+ // is not check-received, only id stored in data array
+ UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo();
+ sentPInfo.flags = 0x4;
+ sentPInfo.dataSize = 0;
+ sentPInfo.sender.addr = in6addr_loopback;
+ sentPInfo.receiver.addr = iter->first.addr;
+ sentPInfo.sender.port = ntohs(socketInfo.sin6_port);
+ sentPInfo.receiver.port = iter->second.port;
+ *((uint32_t*)(sentPInfo.data + 8)) = htonl(iter->second.lseq - 1);
+
+ iter->second.sentPkts.push_back(std::move(sentPInfo));
+ iter->second.cleanupSentPkts();
}
- std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
-#else
- assert(!"libsodium disabled, invalid state");
- UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
- "libsodium is disabled, cannot send packet");
- continue;
-#endif
- } else {
- std::memcpy(buf.get() + UDPC_NSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize);
- }
-
- UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
- destinationInfo.sin6_family = AF_INET6;
- std::memcpy(
- UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
- UDPC_IPV6_ADDR_SUB(iter->first.addr),
- 16);
- destinationInfo.sin6_port = htons(iter->second.port);
- destinationInfo.sin6_flowinfo = 0;
- destinationInfo.sin6_scope_id = iter->first.scope_id;
- long int sentBytes = sendto(
- socketHandle,
- buf.get(),
- sendSize,
- 0,
- (struct sockaddr*) &destinationInfo,
- sizeof(UDPC_IPV6_SOCKADDR_TYPE));
- if(sentBytes != sendSize) {
- UDPC_CHECK_LOG(this,
- UDPC_LoggingType::UDPC_ERROR,
- "Failed to send packet to ",
- UDPC_atostr((UDPC_HContext)this, iter->first.addr),
- ", port = ",
- iter->second.port);
- continue;
- }
-
- if((pInfo.flags & 0x4) == 0) {
- // is check-received, store data in case packet gets lost
- UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo();
- std::memcpy(sentPInfo.data, buf.get(), sendSize);
- sentPInfo.flags = 0;
- sentPInfo.dataSize = sendSize;
- sentPInfo.sender.addr = in6addr_loopback;
- sentPInfo.receiver.addr = iter->first.addr;
- sentPInfo.sender.port = ntohs(socketInfo.sin6_port);
- sentPInfo.receiver.port = iter->second.port;
-
- iter->second.sentPkts.push_back(std::move(sentPInfo));
- iter->second.cleanupSentPkts();
- } else {
- // is not check-received, only id stored in data array
- UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo();
- sentPInfo.flags = 0x4;
- sentPInfo.dataSize = 0;
- sentPInfo.sender.addr = in6addr_loopback;
- sentPInfo.receiver.addr = iter->first.addr;
- sentPInfo.sender.port = ntohs(socketInfo.sin6_port);
- sentPInfo.receiver.port = iter->second.port;
- *((uint32_t*)(sentPInfo.data + 8)) = htonl(iter->second.lseq - 1);
-
- iter->second.sentPkts.push_back(std::move(sentPInfo));
- iter->second.cleanupSentPkts();
+ // store other pkt info
+ UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
+ sentPktInfo->id = iter->second.lseq - 1;
+ iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
}
-
- // store other pkt info
- UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
- sentPktInfo->id = iter->second.lseq - 1;
- iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
+ iter->second.sent = now;
}
- iter->second.sent = now;
}
// remove queued for deletion
for(auto delIter = deletionMap.begin(); delIter != deletionMap.end(); ++delIter) {
+ std::lock_guard<std::mutex> lock(conMapMutex);
auto iter = conMap.find(*delIter);
if(iter != conMap.end()) {
if(iter->second.flags.test(4)) {
if(isConnect && !isPing) {
// is connect packet and is accepting new connections
+ std::lock_guard<std::mutex> lock(conMapMutex);
if(!flags.test(1)
&& conMap.find(identifier) == conMap.end()
&& isAcceptNewConnections.load()) {
return;
}
+ std::lock_guard<std::mutex> lock(conMapMutex);
auto iter = conMap.find(identifier);
if(iter == conMap.end() || iter->second.flags.test(3)
|| !iter->second.flags.test(4) || iter->second.id != conID) {
decltype(now) nextNow;
while(ctx->threadRunning.load()) {
now = std::chrono::steady_clock::now();
- {
- std::lock_guard<std::mutex> lock(ctx->conMapMutex);
- ctx->update_impl();
- }
+ ctx->update_impl();
nextNow = std::chrono::steady_clock::now();
std::this_thread::sleep_for(ctx->threadedSleepTime - (nextNow - now));
}
return;
}
- std::lock_guard<std::mutex> lock(c->conMapMutex);
c->update_impl();
}