diff --git a/src/UDPConnection.cpp b/src/UDPConnection.cpp index 73a8fe8..3d62434 100644 --- a/src/UDPConnection.cpp +++ b/src/UDPConnection.cpp @@ -317,6 +317,8 @@ void UDPC::Context::update_impl() { } newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT; + + std::lock_guard lock(conMapMutex); if(conMap.find(optE->conId) == conMap.end()) { conMap.insert(std::make_pair( optE->conId, @@ -349,6 +351,8 @@ void UDPC::Context::update_impl() { } break; case UDPC_ET_REQUEST_DISCONNECT: + { + std::lock_guard lock(conMapMutex); if(optE->v.dropAllWithAddr != 0) { // drop all connections with same address auto addrConIter = addrConMap.find(optE->conId.addr); @@ -369,6 +373,7 @@ void UDPC::Context::update_impl() { deletionMap.insert(iter->first); } } + } break; default: assert(!"internalEvents got invalid type"); @@ -380,6 +385,7 @@ void UDPC::Context::update_impl() { { // check timed out, check good/bad mode with rtt, remove timed out std::vector removed; + std::lock_guard lock(conMapMutex); for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) { temp_dt_fs = now - iter->second.received; if(temp_dt_fs >= UDPC::CONNECTION_TIMEOUT) { @@ -499,6 +505,7 @@ void UDPC::Context::update_impl() { while(true) { auto next = sendIter.current(); if(next) { + std::lock_guard lock(conMapMutex); auto iter = conMap.find(next->receiver); if(iter != conMap.end()) { if(iter->second.sendPkts.size() >= UDPC_QUEUED_PKTS_MAX_SIZE) { @@ -551,217 +558,27 @@ void UDPC::Context::update_impl() { } // update send (only if triggerSend flag is set) - for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) { - auto delIter = deletionMap.find(iter->first); - if(!iter->second.flags.test(0) && delIter == deletionMap.end()) { - continue; - } else if(delIter != deletionMap.end()) { - if(iter->second.flags.test(3)) { - // not initiated connection yet, no need to send disconnect pkt + { + std::lock_guard lock(conMapMutex); + for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) { + auto delIter = deletionMap.find(iter->first); + if(!iter->second.flags.test(0) && delIter == deletionMap.end()) { continue; - } - unsigned int sendSize = 0; - std::unique_ptr buf; - if(flags.test(2) && iter->second.flags.test(6)) { - sendSize = UDPC_LSFULL_HEADER_SIZE; - buf = std::unique_ptr(new char[sendSize]); - *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1; - } else { - sendSize = UDPC_NSFULL_HEADER_SIZE; - buf = std::unique_ptr(new char[sendSize]); - *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0; - } - UDPC::preparePacket( - buf.get(), - protocolID, - iter->second.id, - iter->second.rseq, - iter->second.ack, - &iter->second.lseq, - 0x3); - if(flags.test(2) && iter->second.flags.test(6)) { -#ifdef UDPC_LIBSODIUM_ENABLED - unsigned char sig[crypto_sign_BYTES]; - std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES); - if(crypto_sign_detached( - sig, nullptr, - (unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE, - iter->second.sk) != 0) { - UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, - "Failed to sign packet for peer ", - UDPC_atostr((UDPC_HContext)this, iter->first.addr), - ", port ", - iter->second.port); + } else if(delIter != deletionMap.end()) { + if(iter->second.flags.test(3)) { + // not initiated connection yet, no need to send disconnect pkt continue; } - std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES); -#else - assert(!"libsodium disabled, invalid state"); - UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, - "libsodium is disabled, cannot send packet"); - continue; -#endif - } - - UDPC_IPV6_SOCKADDR_TYPE destinationInfo; - destinationInfo.sin6_family = AF_INET6; - std::memcpy( - UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), - UDPC_IPV6_ADDR_SUB(iter->first.addr), - 16); - destinationInfo.sin6_port = htons(iter->second.port); - destinationInfo.sin6_flowinfo = 0; - destinationInfo.sin6_scope_id = iter->first.scope_id; - long int sentBytes = sendto( - socketHandle, - buf.get(), - sendSize, - 0, - (struct sockaddr*) &destinationInfo, - sizeof(UDPC_IPV6_SOCKADDR_TYPE)); - if(sentBytes != sendSize) { - UDPC_CHECK_LOG(this, - UDPC_LoggingType::UDPC_ERROR, - "Failed to send disconnect packet to ", - UDPC_atostr((UDPC_HContext)this, iter->first.addr), - ", port = ", - iter->second.port); - continue; - } - continue; - } - - // clear triggerSend flag - iter->second.flags.reset(0); - - if(iter->second.flags.test(3)) { - if(flags.test(1)) { - // is initiating connection to server - auto initDT = now - iter->second.sent; - if(initDT < UDPC::INIT_PKT_INTERVAL_DT) { - continue; - } - iter->second.sent = now; - - std::unique_ptr buf; unsigned int sendSize = 0; - if(flags.test(2) && iter->second.flags.test(6)) { -#ifdef UDPC_LIBSODIUM_ENABLED - sendSize = UDPC_CCL_HEADER_SIZE; - buf = std::unique_ptr(new char[sendSize]); - // set type 1 - *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(1); - // set public key - std::memcpy( - buf.get() + UDPC_MIN_HEADER_SIZE + 4, - iter->second.pk, - crypto_sign_PUBLICKEYBYTES); - // set verify message - std::time_t time = std::time(nullptr); - if(time <= 0) { - UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, - "Failed to get current epoch time"); - continue; - } - uint64_t timeInt = time; -# ifndef NDEBUG - UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG, - "Client set up verification epoch time \"", - timeInt, "\""); -# endif - UDPC::be64((char*)&timeInt); - iter->second.verifyMessage = - std::unique_ptr(new char[8]); - std::memcpy( - iter->second.verifyMessage.get(), - &timeInt, - 8); - std::memcpy( - buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES, - &timeInt, - 8); -#else - assert(!"libsodium is disabled, invalid state"); - UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, - "libsodium is disabled, cannot send packet"); - continue; -#endif - } else { - sendSize = UDPC_CON_HEADER_SIZE; - buf = std::unique_ptr(new char[sendSize]); - *((uint32_t*)(buf.get() + 20)) = 0; - } - UDPC::preparePacket( - buf.get(), - protocolID, - 0, - 0, - 0xFFFFFFFF, - nullptr, - 0x1); - - UDPC_IPV6_SOCKADDR_TYPE destinationInfo; - destinationInfo.sin6_family = AF_INET6; - std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16); - destinationInfo.sin6_port = htons(iter->second.port); - destinationInfo.sin6_flowinfo = 0; - destinationInfo.sin6_scope_id = iter->first.scope_id; - long int sentBytes = sendto( - socketHandle, - buf.get(), - sendSize, - 0, - (struct sockaddr*) &destinationInfo, - sizeof(UDPC_IPV6_SOCKADDR_TYPE)); - if(sentBytes != sendSize) { - UDPC_CHECK_LOG(this, - UDPC_LoggingType::UDPC_ERROR, - "Failed to send packet to initiate connection to ", - UDPC_atostr((UDPC_HContext)this, iter->first.addr), - ", port = ", - iter->second.port); - continue; - } else { - UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_INFO, "Sent initiate connection to ", - UDPC_atostr((UDPC_HContext)this, iter->first.addr), - ", port = ", iter->second.port, - flags.test(2) && iter->second.flags.test(6) ? - ", libsodium enabled" : ", libsodium disabled"); - } - } else { - // is server, initiate connection to client - iter->second.flags.reset(3); - iter->second.sent = now; - std::unique_ptr buf; - unsigned int sendSize = 0; if(flags.test(2) && iter->second.flags.test(6)) { -#ifdef UDPC_LIBSODIUM_ENABLED - sendSize = UDPC_CSR_HEADER_SIZE; + sendSize = UDPC_LSFULL_HEADER_SIZE; buf = std::unique_ptr(new char[sendSize]); - // set type - *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(2); - // set pubkey - std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 4, - iter->second.pk, - crypto_sign_PUBLICKEYBYTES); - // set detached sig - assert(iter->second.verifyMessage && - "Detached sig in verifyMessage must exist"); - std::memcpy( - buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES, - iter->second.verifyMessage.get(), - crypto_sign_BYTES); -#else - assert(!"libsodium disabled, invalid state"); - UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, - "libsodium is disabled, cannot send packet"); - continue; -#endif + *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1; } else { - sendSize = UDPC_CON_HEADER_SIZE; + sendSize = UDPC_NSFULL_HEADER_SIZE; buf = std::unique_ptr(new char[sendSize]); - *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0; + *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0; } UDPC::preparePacket( buf.get(), @@ -770,11 +587,37 @@ void UDPC::Context::update_impl() { iter->second.rseq, iter->second.ack, &iter->second.lseq, - 0x1); + 0x3); + if(flags.test(2) && iter->second.flags.test(6)) { +#ifdef UDPC_LIBSODIUM_ENABLED + unsigned char sig[crypto_sign_BYTES]; + std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES); + if(crypto_sign_detached( + sig, nullptr, + (unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE, + iter->second.sk) != 0) { + UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, + "Failed to sign packet for peer ", + UDPC_atostr((UDPC_HContext)this, iter->first.addr), + ", port ", + iter->second.port); + continue; + } + std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES); +#else + assert(!"libsodium disabled, invalid state"); + UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, + "libsodium is disabled, cannot send packet"); + continue; +#endif + } UDPC_IPV6_SOCKADDR_TYPE destinationInfo; destinationInfo.sin6_family = AF_INET6; - std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16); + std::memcpy( + UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), + UDPC_IPV6_ADDR_SUB(iter->first.addr), + 16); destinationInfo.sin6_port = htons(iter->second.port); destinationInfo.sin6_flowinfo = 0; destinationInfo.sin6_scope_id = iter->first.scope_id; @@ -788,239 +631,407 @@ void UDPC::Context::update_impl() { if(sentBytes != sendSize) { UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, - "Failed to send packet to initiate connection to ", + "Failed to send disconnect packet to ", UDPC_atostr((UDPC_HContext)this, iter->first.addr), ", port = ", iter->second.port); continue; } - UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG, - "Sent init pkt to client ", - UDPC_atostr((UDPC_HContext)this, destinationInfo.sin6_addr), - ", port ", iter->second.port); - } - continue; - } - - // Not initiating connection, send as normal on current connection - if(iter->second.sendPkts.empty() && iter->second.priorityPkts.empty()) { - // nothing in queues, send heartbeat packet - auto sentDT = now - iter->second.sent; - if(sentDT < UDPC::HEARTBEAT_PKT_INTERVAL_DT) { continue; } - unsigned int sendSize = 0; - std::unique_ptr buf; - if(flags.test(2) && iter->second.flags.test(6)) { - sendSize = UDPC_LSFULL_HEADER_SIZE; - buf = std::unique_ptr(new char[sendSize]); - *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1; - } else { - sendSize = UDPC_NSFULL_HEADER_SIZE; - buf = std::unique_ptr(new char[sendSize]); - *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0; - } - UDPC::preparePacket( - buf.get(), - protocolID, - iter->second.id, - iter->second.rseq, - iter->second.ack, - &iter->second.lseq, - 0); - if(flags.test(2) && iter->second.flags.test(6)) { + // clear triggerSend flag + iter->second.flags.reset(0); + + if(iter->second.flags.test(3)) { + if(flags.test(1)) { + // is initiating connection to server + auto initDT = now - iter->second.sent; + if(initDT < UDPC::INIT_PKT_INTERVAL_DT) { + continue; + } + iter->second.sent = now; + + std::unique_ptr buf; + unsigned int sendSize = 0; + if(flags.test(2) && iter->second.flags.test(6)) { #ifdef UDPC_LIBSODIUM_ENABLED - unsigned char sig[crypto_sign_BYTES]; - std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES); - if(crypto_sign_detached( - sig, nullptr, - (unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE, - iter->second.sk) != 0) { + sendSize = UDPC_CCL_HEADER_SIZE; + buf = std::unique_ptr(new char[sendSize]); + // set type 1 + *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(1); + // set public key + std::memcpy( + buf.get() + UDPC_MIN_HEADER_SIZE + 4, + iter->second.pk, + crypto_sign_PUBLICKEYBYTES); + // set verify message + std::time_t time = std::time(nullptr); + if(time <= 0) { + UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, + "Failed to get current epoch time"); + continue; + } + uint64_t timeInt = time; +# ifndef NDEBUG + UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG, + "Client set up verification epoch time \"", + timeInt, "\""); +# endif + UDPC::be64((char*)&timeInt); + iter->second.verifyMessage = + std::unique_ptr(new char[8]); + std::memcpy( + iter->second.verifyMessage.get(), + &timeInt, + 8); + std::memcpy( + buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES, + &timeInt, + 8); +#else + assert(!"libsodium is disabled, invalid state"); + UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, + "libsodium is disabled, cannot send packet"); + continue; +#endif + } else { + sendSize = UDPC_CON_HEADER_SIZE; + buf = std::unique_ptr(new char[sendSize]); + *((uint32_t*)(buf.get() + 20)) = 0; + } + UDPC::preparePacket( + buf.get(), + protocolID, + 0, + 0, + 0xFFFFFFFF, + nullptr, + 0x1); + + UDPC_IPV6_SOCKADDR_TYPE destinationInfo; + destinationInfo.sin6_family = AF_INET6; + std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16); + destinationInfo.sin6_port = htons(iter->second.port); + destinationInfo.sin6_flowinfo = 0; + destinationInfo.sin6_scope_id = iter->first.scope_id; + long int sentBytes = sendto( + socketHandle, + buf.get(), + sendSize, + 0, + (struct sockaddr*) &destinationInfo, + sizeof(UDPC_IPV6_SOCKADDR_TYPE)); + if(sentBytes != sendSize) { + UDPC_CHECK_LOG(this, + UDPC_LoggingType::UDPC_ERROR, + "Failed to send packet to initiate connection to ", + UDPC_atostr((UDPC_HContext)this, iter->first.addr), + ", port = ", + iter->second.port); + continue; + } else { + UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_INFO, "Sent initiate connection to ", + UDPC_atostr((UDPC_HContext)this, iter->first.addr), + ", port = ", iter->second.port, + flags.test(2) && iter->second.flags.test(6) ? + ", libsodium enabled" : ", libsodium disabled"); + } + } else { + // is server, initiate connection to client + iter->second.flags.reset(3); + iter->second.sent = now; + + std::unique_ptr buf; + unsigned int sendSize = 0; + if(flags.test(2) && iter->second.flags.test(6)) { +#ifdef UDPC_LIBSODIUM_ENABLED + sendSize = UDPC_CSR_HEADER_SIZE; + buf = std::unique_ptr(new char[sendSize]); + // set type + *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(2); + // set pubkey + std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 4, + iter->second.pk, + crypto_sign_PUBLICKEYBYTES); + // set detached sig + assert(iter->second.verifyMessage && + "Detached sig in verifyMessage must exist"); + std::memcpy( + buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES, + iter->second.verifyMessage.get(), + crypto_sign_BYTES); +#else + assert(!"libsodium disabled, invalid state"); + UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, + "libsodium is disabled, cannot send packet"); + continue; +#endif + } else { + sendSize = UDPC_CON_HEADER_SIZE; + buf = std::unique_ptr(new char[sendSize]); + *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0; + } + UDPC::preparePacket( + buf.get(), + protocolID, + iter->second.id, + iter->second.rseq, + iter->second.ack, + &iter->second.lseq, + 0x1); + + UDPC_IPV6_SOCKADDR_TYPE destinationInfo; + destinationInfo.sin6_family = AF_INET6; + std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16); + destinationInfo.sin6_port = htons(iter->second.port); + destinationInfo.sin6_flowinfo = 0; + destinationInfo.sin6_scope_id = iter->first.scope_id; + long int sentBytes = sendto( + socketHandle, + buf.get(), + sendSize, + 0, + (struct sockaddr*) &destinationInfo, + sizeof(UDPC_IPV6_SOCKADDR_TYPE)); + if(sentBytes != sendSize) { + UDPC_CHECK_LOG(this, + UDPC_LoggingType::UDPC_ERROR, + "Failed to send packet to initiate connection to ", + UDPC_atostr((UDPC_HContext)this, iter->first.addr), + ", port = ", + iter->second.port); + continue; + } + UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG, + "Sent init pkt to client ", + UDPC_atostr((UDPC_HContext)this, destinationInfo.sin6_addr), + ", port ", iter->second.port); + } + continue; + } + + // Not initiating connection, send as normal on current connection + if(iter->second.sendPkts.empty() && iter->second.priorityPkts.empty()) { + // nothing in queues, send heartbeat packet + auto sentDT = now - iter->second.sent; + if(sentDT < UDPC::HEARTBEAT_PKT_INTERVAL_DT) { + continue; + } + + unsigned int sendSize = 0; + std::unique_ptr buf; + if(flags.test(2) && iter->second.flags.test(6)) { + sendSize = UDPC_LSFULL_HEADER_SIZE; + buf = std::unique_ptr(new char[sendSize]); + *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1; + } else { + sendSize = UDPC_NSFULL_HEADER_SIZE; + buf = std::unique_ptr(new char[sendSize]); + *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0; + } + UDPC::preparePacket( + buf.get(), + protocolID, + iter->second.id, + iter->second.rseq, + iter->second.ack, + &iter->second.lseq, + 0); + if(flags.test(2) && iter->second.flags.test(6)) { +#ifdef UDPC_LIBSODIUM_ENABLED + unsigned char sig[crypto_sign_BYTES]; + std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES); + if(crypto_sign_detached( + sig, nullptr, + (unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE, + iter->second.sk) != 0) { + UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, + "Failed to sign packet for peer ", + UDPC_atostr((UDPC_HContext)this, iter->first.addr), + ", port ", + iter->second.port); + continue; + } + std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES); +#else + assert(!"libsodium disabled, invalid state"); UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, - "Failed to sign packet for peer ", + "libsodium is disabled, cannot send packet"); + continue; +#endif + } + + UDPC_IPV6_SOCKADDR_TYPE destinationInfo; + destinationInfo.sin6_family = AF_INET6; + std::memcpy( + UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), + UDPC_IPV6_ADDR_SUB(iter->first.addr), + 16); + destinationInfo.sin6_port = htons(iter->second.port); + destinationInfo.sin6_flowinfo = 0; + destinationInfo.sin6_scope_id = iter->first.scope_id; + long int sentBytes = sendto( + socketHandle, + buf.get(), + sendSize, + 0, + (struct sockaddr*) &destinationInfo, + sizeof(UDPC_IPV6_SOCKADDR_TYPE)); + if(sentBytes != sendSize) { + UDPC_CHECK_LOG(this, + UDPC_LoggingType::UDPC_ERROR, + "Failed to send heartbeat packet to ", UDPC_atostr((UDPC_HContext)this, iter->first.addr), - ", port ", + ", port = ", iter->second.port); continue; } - std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES); -#else - assert(!"libsodium disabled, invalid state"); - UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, - "libsodium is disabled, cannot send packet"); - continue; -#endif - } - UDPC_IPV6_SOCKADDR_TYPE destinationInfo; - destinationInfo.sin6_family = AF_INET6; - std::memcpy( - UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), - UDPC_IPV6_ADDR_SUB(iter->first.addr), - 16); - destinationInfo.sin6_port = htons(iter->second.port); - destinationInfo.sin6_flowinfo = 0; - destinationInfo.sin6_scope_id = iter->first.scope_id; - long int sentBytes = sendto( - socketHandle, - buf.get(), - sendSize, - 0, - (struct sockaddr*) &destinationInfo, - sizeof(UDPC_IPV6_SOCKADDR_TYPE)); - if(sentBytes != sendSize) { - UDPC_CHECK_LOG(this, - UDPC_LoggingType::UDPC_ERROR, - "Failed to send heartbeat packet to ", - UDPC_atostr((UDPC_HContext)this, iter->first.addr), - ", port = ", - iter->second.port); - continue; - } + UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo(); + pInfo.flags = 0x4; + pInfo.sender.addr = in6addr_loopback; + pInfo.receiver.addr = iter->first.addr; + pInfo.sender.port = ntohs(socketInfo.sin6_port); + pInfo.receiver.port = iter->second.port; + *((uint32_t*)(pInfo.data + 8)) = htonl(iter->second.lseq - 1); + pInfo.data[UDPC_MIN_HEADER_SIZE] = flags.test(2) && iter->second.flags.test(6) ? 1 : 0; - UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo(); - pInfo.flags = 0x4; - pInfo.sender.addr = in6addr_loopback; - pInfo.receiver.addr = iter->first.addr; - pInfo.sender.port = ntohs(socketInfo.sin6_port); - pInfo.receiver.port = iter->second.port; - *((uint32_t*)(pInfo.data + 8)) = htonl(iter->second.lseq - 1); - pInfo.data[UDPC_MIN_HEADER_SIZE] = flags.test(2) && iter->second.flags.test(6) ? 1 : 0; + iter->second.sentPkts.push_back(std::move(pInfo)); + iter->second.cleanupSentPkts(); - iter->second.sentPkts.push_back(std::move(pInfo)); - iter->second.cleanupSentPkts(); - - // store other pkt info - UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared(); - sentPktInfo->id = iter->second.lseq - 1; - iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo)); - } else { - // sendPkts or priorityPkts not empty - UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo(); - bool isResending = false; - if(!iter->second.priorityPkts.empty()) { - pInfo = iter->second.priorityPkts.front(); - iter->second.priorityPkts.pop_front(); - isResending = true; + // store other pkt info + UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared(); + sentPktInfo->id = iter->second.lseq - 1; + iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo)); } else { - pInfo = iter->second.sendPkts.front(); - iter->second.sendPkts.pop_front(); - } + // sendPkts or priorityPkts not empty + UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo(); + bool isResending = false; + if(!iter->second.priorityPkts.empty()) { + pInfo = iter->second.priorityPkts.front(); + iter->second.priorityPkts.pop_front(); + isResending = true; + } else { + pInfo = iter->second.sendPkts.front(); + iter->second.sendPkts.pop_front(); + } - std::unique_ptr buf; - unsigned int sendSize = 0; - if(flags.test(2) && iter->second.flags.test(6)) { - sendSize = UDPC_LSFULL_HEADER_SIZE + pInfo.dataSize; - buf = std::unique_ptr(new char[sendSize]); - *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1; - } else { - sendSize = UDPC_NSFULL_HEADER_SIZE + pInfo.dataSize; - buf = std::unique_ptr(new char[sendSize]); - *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0; - } + std::unique_ptr buf; + unsigned int sendSize = 0; + if(flags.test(2) && iter->second.flags.test(6)) { + sendSize = UDPC_LSFULL_HEADER_SIZE + pInfo.dataSize; + buf = std::unique_ptr(new char[sendSize]); + *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1; + } else { + sendSize = UDPC_NSFULL_HEADER_SIZE + pInfo.dataSize; + buf = std::unique_ptr(new char[sendSize]); + *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0; + } - UDPC::preparePacket( - buf.get(), - protocolID, - iter->second.id, - iter->second.rseq, - iter->second.ack, - &iter->second.lseq, - (pInfo.flags & 0x4) | (isResending ? 0x8 : 0)); + UDPC::preparePacket( + buf.get(), + protocolID, + iter->second.id, + iter->second.rseq, + iter->second.ack, + &iter->second.lseq, + (pInfo.flags & 0x4) | (isResending ? 0x8 : 0)); - if(flags.test(2) && iter->second.flags.test(6)) { + if(flags.test(2) && iter->second.flags.test(6)) { #ifdef UDPC_LIBSODIUM_ENABLED - unsigned char sig[crypto_sign_BYTES]; - std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES); - std::memcpy(buf.get() + UDPC_LSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize); - if(crypto_sign_detached( - sig, nullptr, - (unsigned char*)buf.get(), sendSize, - iter->second.sk) != 0) { + unsigned char sig[crypto_sign_BYTES]; + std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES); + std::memcpy(buf.get() + UDPC_LSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize); + if(crypto_sign_detached( + sig, nullptr, + (unsigned char*)buf.get(), sendSize, + iter->second.sk) != 0) { + UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, + "Failed to sign packet for peer ", + UDPC_atostr((UDPC_HContext)this, iter->first.addr), + ", port ", + iter->second.port); + continue; + } + std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES); +#else + assert(!"libsodium disabled, invalid state"); UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, - "Failed to sign packet for peer ", + "libsodium is disabled, cannot send packet"); + continue; +#endif + } else { + std::memcpy(buf.get() + UDPC_NSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize); + } + + + UDPC_IPV6_SOCKADDR_TYPE destinationInfo; + destinationInfo.sin6_family = AF_INET6; + std::memcpy( + UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), + UDPC_IPV6_ADDR_SUB(iter->first.addr), + 16); + destinationInfo.sin6_port = htons(iter->second.port); + destinationInfo.sin6_flowinfo = 0; + destinationInfo.sin6_scope_id = iter->first.scope_id; + long int sentBytes = sendto( + socketHandle, + buf.get(), + sendSize, + 0, + (struct sockaddr*) &destinationInfo, + sizeof(UDPC_IPV6_SOCKADDR_TYPE)); + if(sentBytes != sendSize) { + UDPC_CHECK_LOG(this, + UDPC_LoggingType::UDPC_ERROR, + "Failed to send packet to ", UDPC_atostr((UDPC_HContext)this, iter->first.addr), - ", port ", + ", port = ", iter->second.port); continue; } - std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES); -#else - assert(!"libsodium disabled, invalid state"); - UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR, - "libsodium is disabled, cannot send packet"); - continue; -#endif - } else { - std::memcpy(buf.get() + UDPC_NSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize); + + if((pInfo.flags & 0x4) == 0) { + // is check-received, store data in case packet gets lost + UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo(); + std::memcpy(sentPInfo.data, buf.get(), sendSize); + sentPInfo.flags = 0; + sentPInfo.dataSize = sendSize; + sentPInfo.sender.addr = in6addr_loopback; + sentPInfo.receiver.addr = iter->first.addr; + sentPInfo.sender.port = ntohs(socketInfo.sin6_port); + sentPInfo.receiver.port = iter->second.port; + + iter->second.sentPkts.push_back(std::move(sentPInfo)); + iter->second.cleanupSentPkts(); + } else { + // is not check-received, only id stored in data array + UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo(); + sentPInfo.flags = 0x4; + sentPInfo.dataSize = 0; + sentPInfo.sender.addr = in6addr_loopback; + sentPInfo.receiver.addr = iter->first.addr; + sentPInfo.sender.port = ntohs(socketInfo.sin6_port); + sentPInfo.receiver.port = iter->second.port; + *((uint32_t*)(sentPInfo.data + 8)) = htonl(iter->second.lseq - 1); + + iter->second.sentPkts.push_back(std::move(sentPInfo)); + iter->second.cleanupSentPkts(); + } + + // store other pkt info + UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared(); + sentPktInfo->id = iter->second.lseq - 1; + iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo)); } - - - UDPC_IPV6_SOCKADDR_TYPE destinationInfo; - destinationInfo.sin6_family = AF_INET6; - std::memcpy( - UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), - UDPC_IPV6_ADDR_SUB(iter->first.addr), - 16); - destinationInfo.sin6_port = htons(iter->second.port); - destinationInfo.sin6_flowinfo = 0; - destinationInfo.sin6_scope_id = iter->first.scope_id; - long int sentBytes = sendto( - socketHandle, - buf.get(), - sendSize, - 0, - (struct sockaddr*) &destinationInfo, - sizeof(UDPC_IPV6_SOCKADDR_TYPE)); - if(sentBytes != sendSize) { - UDPC_CHECK_LOG(this, - UDPC_LoggingType::UDPC_ERROR, - "Failed to send packet to ", - UDPC_atostr((UDPC_HContext)this, iter->first.addr), - ", port = ", - iter->second.port); - continue; - } - - if((pInfo.flags & 0x4) == 0) { - // is check-received, store data in case packet gets lost - UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo(); - std::memcpy(sentPInfo.data, buf.get(), sendSize); - sentPInfo.flags = 0; - sentPInfo.dataSize = sendSize; - sentPInfo.sender.addr = in6addr_loopback; - sentPInfo.receiver.addr = iter->first.addr; - sentPInfo.sender.port = ntohs(socketInfo.sin6_port); - sentPInfo.receiver.port = iter->second.port; - - iter->second.sentPkts.push_back(std::move(sentPInfo)); - iter->second.cleanupSentPkts(); - } else { - // is not check-received, only id stored in data array - UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo(); - sentPInfo.flags = 0x4; - sentPInfo.dataSize = 0; - sentPInfo.sender.addr = in6addr_loopback; - sentPInfo.receiver.addr = iter->first.addr; - sentPInfo.sender.port = ntohs(socketInfo.sin6_port); - sentPInfo.receiver.port = iter->second.port; - *((uint32_t*)(sentPInfo.data + 8)) = htonl(iter->second.lseq - 1); - - iter->second.sentPkts.push_back(std::move(sentPInfo)); - iter->second.cleanupSentPkts(); - } - - // store other pkt info - UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared(); - sentPktInfo->id = iter->second.lseq - 1; - iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo)); + iter->second.sent = now; } - iter->second.sent = now; } // remove queued for deletion for(auto delIter = deletionMap.begin(); delIter != deletionMap.end(); ++delIter) { + std::lock_guard lock(conMapMutex); auto iter = conMap.find(*delIter); if(iter != conMap.end()) { if(iter->second.flags.test(4)) { @@ -1166,6 +1177,7 @@ void UDPC::Context::update_impl() { if(isConnect && !isPing) { // is connect packet and is accepting new connections + std::lock_guard lock(conMapMutex); if(!flags.test(1) && conMap.find(identifier) == conMap.end() && isAcceptNewConnections.load()) { @@ -1397,6 +1409,7 @@ void UDPC::Context::update_impl() { return; } + std::lock_guard lock(conMapMutex); auto iter = conMap.find(identifier); if(iter == conMap.end() || iter->second.flags.test(3) || !iter->second.flags.test(4) || iter->second.id != conID) { @@ -1780,10 +1793,7 @@ void UDPC::threadedUpdate(Context *ctx) { decltype(now) nextNow; while(ctx->threadRunning.load()) { now = std::chrono::steady_clock::now(); - { - std::lock_guard lock(ctx->conMapMutex); - ctx->update_impl(); - } + ctx->update_impl(); nextNow = std::chrono::steady_clock::now(); std::this_thread::sleep_for(ctx->threadedSleepTime - (nextNow - now)); } @@ -2066,7 +2076,6 @@ void UDPC_update(UDPC_HContext ctx) { return; } - std::lock_guard lock(c->conMapMutex); c->update_impl(); }