]> git.seodisparate.com - UDPConnection/commitdiff
Refactor locking mutex during update
authorStephen Seo <seo.disparate@gmail.com>
Sun, 5 Jan 2020 06:05:23 +0000 (15:05 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Sun, 5 Jan 2020 06:05:23 +0000 (15:05 +0900)
src/UDPConnection.cpp

index 73a8fe8931f74975c9595ffcd5638a1bbb1233ca..3d62434888323b842b0ff342812a633c0b997bb6 100644 (file)
@@ -317,6 +317,8 @@ void UDPC::Context::update_impl() {
                 }
                 newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT;
 
+
+                std::lock_guard<std::mutex> lock(conMapMutex);
                 if(conMap.find(optE->conId) == conMap.end()) {
                     conMap.insert(std::make_pair(
                         optE->conId,
@@ -349,6 +351,8 @@ void UDPC::Context::update_impl() {
             }
                 break;
             case UDPC_ET_REQUEST_DISCONNECT:
+            {
+                std::lock_guard<std::mutex> lock(conMapMutex);
                 if(optE->v.dropAllWithAddr != 0) {
                     // drop all connections with same address
                     auto addrConIter = addrConMap.find(optE->conId.addr);
@@ -369,6 +373,7 @@ void UDPC::Context::update_impl() {
                         deletionMap.insert(iter->first);
                     }
                 }
+            }
                 break;
             default:
                 assert(!"internalEvents got invalid type");
@@ -380,6 +385,7 @@ void UDPC::Context::update_impl() {
     {
         // check timed out, check good/bad mode with rtt, remove timed out
         std::vector<UDPC_ConnectionId> removed;
+        std::lock_guard<std::mutex> lock(conMapMutex);
         for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
             temp_dt_fs = now - iter->second.received;
             if(temp_dt_fs >= UDPC::CONNECTION_TIMEOUT) {
@@ -499,6 +505,7 @@ void UDPC::Context::update_impl() {
         while(true) {
             auto next = sendIter.current();
             if(next) {
+                std::lock_guard<std::mutex> lock(conMapMutex);
                 auto iter = conMap.find(next->receiver);
                 if(iter != conMap.end()) {
                     if(iter->second.sendPkts.size() >= UDPC_QUEUED_PKTS_MAX_SIZE) {
@@ -551,158 +558,316 @@ void UDPC::Context::update_impl() {
     }
 
     // update send (only if triggerSend flag is set)
-    for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
-        auto delIter = deletionMap.find(iter->first);
-        if(!iter->second.flags.test(0) && delIter == deletionMap.end()) {
-            continue;
-        } else if(delIter != deletionMap.end()) {
-            if(iter->second.flags.test(3)) {
-                // not initiated connection yet, no need to send disconnect pkt
+    {
+        std::lock_guard<std::mutex> lock(conMapMutex);
+        for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
+            auto delIter = deletionMap.find(iter->first);
+            if(!iter->second.flags.test(0) && delIter == deletionMap.end()) {
                 continue;
-            }
-            unsigned int sendSize = 0;
-            std::unique_ptr<char[]> buf;
-            if(flags.test(2) && iter->second.flags.test(6)) {
-                sendSize = UDPC_LSFULL_HEADER_SIZE;
-                buf = std::unique_ptr<char[]>(new char[sendSize]);
-                *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
-            } else {
-                sendSize = UDPC_NSFULL_HEADER_SIZE;
-                buf = std::unique_ptr<char[]>(new char[sendSize]);
-                *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
-            }
-            UDPC::preparePacket(
-                buf.get(),
-                protocolID,
-                iter->second.id,
-                iter->second.rseq,
-                iter->second.ack,
-                &iter->second.lseq,
-                0x3);
-            if(flags.test(2) && iter->second.flags.test(6)) {
+            } else if(delIter != deletionMap.end()) {
+                if(iter->second.flags.test(3)) {
+                    // not initiated connection yet, no need to send disconnect pkt
+                    continue;
+                }
+                unsigned int sendSize = 0;
+                std::unique_ptr<char[]> buf;
+                if(flags.test(2) && iter->second.flags.test(6)) {
+                    sendSize = UDPC_LSFULL_HEADER_SIZE;
+                    buf = std::unique_ptr<char[]>(new char[sendSize]);
+                    *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
+                } else {
+                    sendSize = UDPC_NSFULL_HEADER_SIZE;
+                    buf = std::unique_ptr<char[]>(new char[sendSize]);
+                    *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
+                }
+                UDPC::preparePacket(
+                    buf.get(),
+                    protocolID,
+                    iter->second.id,
+                    iter->second.rseq,
+                    iter->second.ack,
+                    &iter->second.lseq,
+                    0x3);
+                if(flags.test(2) && iter->second.flags.test(6)) {
 #ifdef UDPC_LIBSODIUM_ENABLED
-                unsigned char sig[crypto_sign_BYTES];
-                std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
-                if(crypto_sign_detached(
-                    sig, nullptr,
-                    (unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE,
-                    iter->second.sk) != 0) {
+                    unsigned char sig[crypto_sign_BYTES];
+                    std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
+                    if(crypto_sign_detached(
+                        sig, nullptr,
+                        (unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE,
+                        iter->second.sk) != 0) {
+                        UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
+                            "Failed to sign packet for peer ",
+                            UDPC_atostr((UDPC_HContext)this, iter->first.addr),
+                            ", port ",
+                            iter->second.port);
+                        continue;
+                    }
+                    std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
+#else
+                    assert(!"libsodium disabled, invalid state");
                     UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
-                        "Failed to sign packet for peer ",
+                        "libsodium is disabled, cannot send packet");
+                    continue;
+#endif
+                }
+
+                UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
+                destinationInfo.sin6_family = AF_INET6;
+                std::memcpy(
+                    UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
+                    UDPC_IPV6_ADDR_SUB(iter->first.addr),
+                    16);
+                destinationInfo.sin6_port = htons(iter->second.port);
+                destinationInfo.sin6_flowinfo = 0;
+                destinationInfo.sin6_scope_id = iter->first.scope_id;
+                long int sentBytes = sendto(
+                    socketHandle,
+                    buf.get(),
+                    sendSize,
+                    0,
+                    (struct sockaddr*) &destinationInfo,
+                    sizeof(UDPC_IPV6_SOCKADDR_TYPE));
+                if(sentBytes != sendSize) {
+                    UDPC_CHECK_LOG(this,
+                        UDPC_LoggingType::UDPC_ERROR,
+                        "Failed to send disconnect packet to ",
                         UDPC_atostr((UDPC_HContext)this, iter->first.addr),
-                        ", port ",
+                        ", port ",
                         iter->second.port);
                     continue;
                 }
-                std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
-#else
-                assert(!"libsodium disabled, invalid state");
-                UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
-                    "libsodium is disabled, cannot send packet");
                 continue;
-#endif
             }
 
-            UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
-            destinationInfo.sin6_family = AF_INET6;
-            std::memcpy(
-                UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
-                UDPC_IPV6_ADDR_SUB(iter->first.addr),
-                16);
-            destinationInfo.sin6_port = htons(iter->second.port);
-            destinationInfo.sin6_flowinfo = 0;
-            destinationInfo.sin6_scope_id = iter->first.scope_id;
-            long int sentBytes = sendto(
-                socketHandle,
-                buf.get(),
-                sendSize,
-                0,
-                (struct sockaddr*) &destinationInfo,
-                sizeof(UDPC_IPV6_SOCKADDR_TYPE));
-            if(sentBytes != sendSize) {
-                UDPC_CHECK_LOG(this,
-                    UDPC_LoggingType::UDPC_ERROR,
-                    "Failed to send disconnect packet to ",
-                    UDPC_atostr((UDPC_HContext)this, iter->first.addr),
-                    ", port = ",
-                    iter->second.port);
+            // clear triggerSend flag
+            iter->second.flags.reset(0);
+
+            if(iter->second.flags.test(3)) {
+                if(flags.test(1)) {
+                    // is initiating connection to server
+                    auto initDT = now - iter->second.sent;
+                    if(initDT < UDPC::INIT_PKT_INTERVAL_DT) {
+                        continue;
+                    }
+                    iter->second.sent = now;
+
+                    std::unique_ptr<char[]> buf;
+                    unsigned int sendSize = 0;
+                    if(flags.test(2) && iter->second.flags.test(6)) {
+#ifdef UDPC_LIBSODIUM_ENABLED
+                        sendSize = UDPC_CCL_HEADER_SIZE;
+                        buf = std::unique_ptr<char[]>(new char[sendSize]);
+                        // set type 1
+                        *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(1);
+                        // set public key
+                        std::memcpy(
+                            buf.get() + UDPC_MIN_HEADER_SIZE + 4,
+                            iter->second.pk,
+                            crypto_sign_PUBLICKEYBYTES);
+                        // set verify message
+                        std::time_t time = std::time(nullptr);
+                        if(time <= 0) {
+                            UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
+                                "Failed to get current epoch time");
+                            continue;
+                        }
+                        uint64_t timeInt = time;
+# ifndef NDEBUG
+                        UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG,
+                            "Client set up verification epoch time \"",
+                            timeInt, "\"");
+# endif
+                        UDPC::be64((char*)&timeInt);
+                        iter->second.verifyMessage =
+                            std::unique_ptr<char[]>(new char[8]);
+                        std::memcpy(
+                            iter->second.verifyMessage.get(),
+                            &timeInt,
+                            8);
+                        std::memcpy(
+                            buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES,
+                            &timeInt,
+                            8);
+#else
+                        assert(!"libsodium is disabled, invalid state");
+                        UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
+                            "libsodium is disabled, cannot send packet");
+                        continue;
+#endif
+                    } else {
+                        sendSize = UDPC_CON_HEADER_SIZE;
+                        buf = std::unique_ptr<char[]>(new char[sendSize]);
+                        *((uint32_t*)(buf.get() + 20)) = 0;
+                    }
+                    UDPC::preparePacket(
+                        buf.get(),
+                        protocolID,
+                        0,
+                        0,
+                        0xFFFFFFFF,
+                        nullptr,
+                        0x1);
+
+                    UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
+                    destinationInfo.sin6_family = AF_INET6;
+                    std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
+                    destinationInfo.sin6_port = htons(iter->second.port);
+                    destinationInfo.sin6_flowinfo = 0;
+                    destinationInfo.sin6_scope_id = iter->first.scope_id;
+                    long int sentBytes = sendto(
+                        socketHandle,
+                        buf.get(),
+                        sendSize,
+                        0,
+                        (struct sockaddr*) &destinationInfo,
+                        sizeof(UDPC_IPV6_SOCKADDR_TYPE));
+                    if(sentBytes != sendSize) {
+                        UDPC_CHECK_LOG(this,
+                            UDPC_LoggingType::UDPC_ERROR,
+                            "Failed to send packet to initiate connection to ",
+                            UDPC_atostr((UDPC_HContext)this, iter->first.addr),
+                            ", port = ",
+                            iter->second.port);
+                        continue;
+                    } else {
+                        UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_INFO, "Sent initiate connection to ",
+                            UDPC_atostr((UDPC_HContext)this, iter->first.addr),
+                            ", port = ", iter->second.port,
+                            flags.test(2) && iter->second.flags.test(6) ?
+                                ", libsodium enabled" : ", libsodium disabled");
+                    }
+                } else {
+                    // is server, initiate connection to client
+                    iter->second.flags.reset(3);
+                    iter->second.sent = now;
+
+                    std::unique_ptr<char[]> buf;
+                    unsigned int sendSize = 0;
+                    if(flags.test(2) && iter->second.flags.test(6)) {
+#ifdef UDPC_LIBSODIUM_ENABLED
+                        sendSize = UDPC_CSR_HEADER_SIZE;
+                        buf = std::unique_ptr<char[]>(new char[sendSize]);
+                        // set type
+                        *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(2);
+                        // set pubkey
+                        std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 4,
+                            iter->second.pk,
+                            crypto_sign_PUBLICKEYBYTES);
+                        // set detached sig
+                        assert(iter->second.verifyMessage &&
+                            "Detached sig in verifyMessage must exist");
+                        std::memcpy(
+                            buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES,
+                            iter->second.verifyMessage.get(),
+                            crypto_sign_BYTES);
+#else
+                        assert(!"libsodium disabled, invalid state");
+                        UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
+                            "libsodium is disabled, cannot send packet");
+                        continue;
+#endif
+                    } else {
+                        sendSize = UDPC_CON_HEADER_SIZE;
+                        buf = std::unique_ptr<char[]>(new char[sendSize]);
+                        *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
+                    }
+                    UDPC::preparePacket(
+                        buf.get(),
+                        protocolID,
+                        iter->second.id,
+                        iter->second.rseq,
+                        iter->second.ack,
+                        &iter->second.lseq,
+                        0x1);
+
+                    UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
+                    destinationInfo.sin6_family = AF_INET6;
+                    std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
+                    destinationInfo.sin6_port = htons(iter->second.port);
+                    destinationInfo.sin6_flowinfo = 0;
+                    destinationInfo.sin6_scope_id = iter->first.scope_id;
+                    long int sentBytes = sendto(
+                        socketHandle,
+                        buf.get(),
+                        sendSize,
+                        0,
+                        (struct sockaddr*) &destinationInfo,
+                        sizeof(UDPC_IPV6_SOCKADDR_TYPE));
+                    if(sentBytes != sendSize) {
+                        UDPC_CHECK_LOG(this,
+                            UDPC_LoggingType::UDPC_ERROR,
+                            "Failed to send packet to initiate connection to ",
+                            UDPC_atostr((UDPC_HContext)this, iter->first.addr),
+                            ", port = ",
+                            iter->second.port);
+                        continue;
+                    }
+                    UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG,
+                        "Sent init pkt to client ",
+                        UDPC_atostr((UDPC_HContext)this, destinationInfo.sin6_addr),
+                        ", port ", iter->second.port);
+                }
                 continue;
             }
-            continue;
-        }
-
-        // clear triggerSend flag
-        iter->second.flags.reset(0);
 
-        if(iter->second.flags.test(3)) {
-            if(flags.test(1)) {
-                // is initiating connection to server
-                auto initDT = now - iter->second.sent;
-                if(initDT < UDPC::INIT_PKT_INTERVAL_DT) {
+            // Not initiating connection, send as normal on current connection
+            if(iter->second.sendPkts.empty() && iter->second.priorityPkts.empty()) {
+                // nothing in queues, send heartbeat packet
+                auto sentDT = now - iter->second.sent;
+                if(sentDT < UDPC::HEARTBEAT_PKT_INTERVAL_DT) {
                     continue;
                 }
-                iter->second.sent = now;
 
-                std::unique_ptr<char[]> buf;
                 unsigned int sendSize = 0;
+                std::unique_ptr<char[]> buf;
                 if(flags.test(2) && iter->second.flags.test(6)) {
-#ifdef UDPC_LIBSODIUM_ENABLED
-                    sendSize = UDPC_CCL_HEADER_SIZE;
+                    sendSize = UDPC_LSFULL_HEADER_SIZE;
+                    buf = std::unique_ptr<char[]>(new char[sendSize]);
+                    *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
+                } else {
+                    sendSize = UDPC_NSFULL_HEADER_SIZE;
                     buf = std::unique_ptr<char[]>(new char[sendSize]);
-                    // set type 1
-                    *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(1);
-                    // set public key
-                    std::memcpy(
-                        buf.get() + UDPC_MIN_HEADER_SIZE + 4,
-                        iter->second.pk,
-                        crypto_sign_PUBLICKEYBYTES);
-                    // set verify message
-                    std::time_t time = std::time(nullptr);
-                    if(time <= 0) {
+                    *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
+                }
+                UDPC::preparePacket(
+                    buf.get(),
+                    protocolID,
+                    iter->second.id,
+                    iter->second.rseq,
+                    iter->second.ack,
+                    &iter->second.lseq,
+                    0);
+                if(flags.test(2) && iter->second.flags.test(6)) {
+#ifdef UDPC_LIBSODIUM_ENABLED
+                    unsigned char sig[crypto_sign_BYTES];
+                    std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
+                    if(crypto_sign_detached(
+                        sig, nullptr,
+                        (unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE,
+                        iter->second.sk) != 0) {
                         UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
-                            "Failed to get current epoch time");
+                            "Failed to sign packet for peer ",
+                            UDPC_atostr((UDPC_HContext)this, iter->first.addr),
+                            ", port ",
+                            iter->second.port);
                         continue;
                     }
-                    uint64_t timeInt = time;
-# ifndef NDEBUG
-                    UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG,
-                        "Client set up verification epoch time \"",
-                        timeInt, "\"");
-# endif
-                    UDPC::be64((char*)&timeInt);
-                    iter->second.verifyMessage =
-                        std::unique_ptr<char[]>(new char[8]);
-                    std::memcpy(
-                        iter->second.verifyMessage.get(),
-                        &timeInt,
-                        8);
-                    std::memcpy(
-                        buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES,
-                        &timeInt,
-                        8);
+                    std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
 #else
-                    assert(!"libsodium is disabled, invalid state");
+                    assert(!"libsodium disabled, invalid state");
                     UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
                         "libsodium is disabled, cannot send packet");
                     continue;
 #endif
-                } else {
-                    sendSize = UDPC_CON_HEADER_SIZE;
-                    buf = std::unique_ptr<char[]>(new char[sendSize]);
-                    *((uint32_t*)(buf.get() + 20)) = 0;
                 }
-                UDPC::preparePacket(
-                    buf.get(),
-                    protocolID,
-                    0,
-                    0,
-                    0xFFFFFFFF,
-                    nullptr,
-                    0x1);
 
                 UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
                 destinationInfo.sin6_family = AF_INET6;
-                std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
+                std::memcpy(
+                    UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
+                    UDPC_IPV6_ADDR_SUB(iter->first.addr),
+                    16);
                 destinationInfo.sin6_port = htons(iter->second.port);
                 destinationInfo.sin6_flowinfo = 0;
                 destinationInfo.sin6_scope_id = iter->first.scope_id;
@@ -716,53 +881,54 @@ void UDPC::Context::update_impl() {
                 if(sentBytes != sendSize) {
                     UDPC_CHECK_LOG(this,
                         UDPC_LoggingType::UDPC_ERROR,
-                        "Failed to send packet to initiate connection to ",
+                        "Failed to send heartbeat packet to ",
                         UDPC_atostr((UDPC_HContext)this, iter->first.addr),
                         ", port = ",
                         iter->second.port);
                     continue;
-                } else {
-                    UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_INFO, "Sent initiate connection to ",
-                        UDPC_atostr((UDPC_HContext)this, iter->first.addr),
-                        ", port = ", iter->second.port,
-                        flags.test(2) && iter->second.flags.test(6) ?
-                            ", libsodium enabled" : ", libsodium disabled");
                 }
+
+                UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
+                pInfo.flags = 0x4;
+                pInfo.sender.addr = in6addr_loopback;
+                pInfo.receiver.addr = iter->first.addr;
+                pInfo.sender.port = ntohs(socketInfo.sin6_port);
+                pInfo.receiver.port = iter->second.port;
+                *((uint32_t*)(pInfo.data + 8)) = htonl(iter->second.lseq - 1);
+                pInfo.data[UDPC_MIN_HEADER_SIZE] = flags.test(2) && iter->second.flags.test(6) ? 1 : 0;
+
+                iter->second.sentPkts.push_back(std::move(pInfo));
+                iter->second.cleanupSentPkts();
+
+                // store other pkt info
+                UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
+                sentPktInfo->id = iter->second.lseq - 1;
+                iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
             } else {
-                // is server, initiate connection to client
-                iter->second.flags.reset(3);
-                iter->second.sent = now;
+                // sendPkts or priorityPkts not empty
+                UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
+                bool isResending = false;
+                if(!iter->second.priorityPkts.empty()) {
+                    pInfo = iter->second.priorityPkts.front();
+                    iter->second.priorityPkts.pop_front();
+                    isResending = true;
+                } else {
+                    pInfo = iter->second.sendPkts.front();
+                    iter->second.sendPkts.pop_front();
+                }
 
                 std::unique_ptr<char[]> buf;
                 unsigned int sendSize = 0;
                 if(flags.test(2) && iter->second.flags.test(6)) {
-#ifdef UDPC_LIBSODIUM_ENABLED
-                    sendSize = UDPC_CSR_HEADER_SIZE;
+                    sendSize = UDPC_LSFULL_HEADER_SIZE + pInfo.dataSize;
                     buf = std::unique_ptr<char[]>(new char[sendSize]);
-                    // set type
-                    *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(2);
-                    // set pubkey
-                    std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 4,
-                        iter->second.pk,
-                        crypto_sign_PUBLICKEYBYTES);
-                    // set detached sig
-                    assert(iter->second.verifyMessage &&
-                        "Detached sig in verifyMessage must exist");
-                    std::memcpy(
-                        buf.get() + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES,
-                        iter->second.verifyMessage.get(),
-                        crypto_sign_BYTES);
-#else
-                    assert(!"libsodium disabled, invalid state");
-                    UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
-                        "libsodium is disabled, cannot send packet");
-                    continue;
-#endif
+                    *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
                 } else {
-                    sendSize = UDPC_CON_HEADER_SIZE;
+                    sendSize = UDPC_NSFULL_HEADER_SIZE + pInfo.dataSize;
                     buf = std::unique_ptr<char[]>(new char[sendSize]);
-                    *((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
+                    *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
                 }
+
                 UDPC::preparePacket(
                     buf.get(),
                     protocolID,
@@ -770,11 +936,42 @@ void UDPC::Context::update_impl() {
                     iter->second.rseq,
                     iter->second.ack,
                     &iter->second.lseq,
-                    0x1);
+                    (pInfo.flags & 0x4) | (isResending ? 0x8 : 0));
+
+                if(flags.test(2) && iter->second.flags.test(6)) {
+#ifdef UDPC_LIBSODIUM_ENABLED
+                    unsigned char sig[crypto_sign_BYTES];
+                    std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
+                    std::memcpy(buf.get() + UDPC_LSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize);
+                    if(crypto_sign_detached(
+                        sig, nullptr,
+                        (unsigned char*)buf.get(), sendSize,
+                        iter->second.sk) != 0) {
+                        UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
+                            "Failed to sign packet for peer ",
+                            UDPC_atostr((UDPC_HContext)this, iter->first.addr),
+                            ", port ",
+                            iter->second.port);
+                        continue;
+                    }
+                    std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
+#else
+                    assert(!"libsodium disabled, invalid state");
+                    UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
+                        "libsodium is disabled, cannot send packet");
+                    continue;
+#endif
+                } else {
+                    std::memcpy(buf.get() + UDPC_NSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize);
+                }
+
 
                 UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
                 destinationInfo.sin6_family = AF_INET6;
-                std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
+                std::memcpy(
+                    UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
+                    UDPC_IPV6_ADDR_SUB(iter->first.addr),
+                    16);
                 destinationInfo.sin6_port = htons(iter->second.port);
                 destinationInfo.sin6_flowinfo = 0;
                 destinationInfo.sin6_scope_id = iter->first.scope_id;
@@ -788,239 +985,53 @@ void UDPC::Context::update_impl() {
                 if(sentBytes != sendSize) {
                     UDPC_CHECK_LOG(this,
                         UDPC_LoggingType::UDPC_ERROR,
-                        "Failed to send packet to initiate connection to ",
+                        "Failed to send packet to ",
                         UDPC_atostr((UDPC_HContext)this, iter->first.addr),
                         ", port = ",
                         iter->second.port);
                     continue;
                 }
-                UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG,
-                    "Sent init pkt to client ",
-                    UDPC_atostr((UDPC_HContext)this, destinationInfo.sin6_addr),
-                    ", port ", iter->second.port);
-            }
-            continue;
-        }
-
-        // Not initiating connection, send as normal on current connection
-        if(iter->second.sendPkts.empty() && iter->second.priorityPkts.empty()) {
-            // nothing in queues, send heartbeat packet
-            auto sentDT = now - iter->second.sent;
-            if(sentDT < UDPC::HEARTBEAT_PKT_INTERVAL_DT) {
-                continue;
-            }
-
-            unsigned int sendSize = 0;
-            std::unique_ptr<char[]> buf;
-            if(flags.test(2) && iter->second.flags.test(6)) {
-                sendSize = UDPC_LSFULL_HEADER_SIZE;
-                buf = std::unique_ptr<char[]>(new char[sendSize]);
-                *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
-            } else {
-                sendSize = UDPC_NSFULL_HEADER_SIZE;
-                buf = std::unique_ptr<char[]>(new char[sendSize]);
-                *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
-            }
-            UDPC::preparePacket(
-                buf.get(),
-                protocolID,
-                iter->second.id,
-                iter->second.rseq,
-                iter->second.ack,
-                &iter->second.lseq,
-                0);
-            if(flags.test(2) && iter->second.flags.test(6)) {
-#ifdef UDPC_LIBSODIUM_ENABLED
-                unsigned char sig[crypto_sign_BYTES];
-                std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
-                if(crypto_sign_detached(
-                    sig, nullptr,
-                    (unsigned char*)buf.get(), UDPC_LSFULL_HEADER_SIZE,
-                    iter->second.sk) != 0) {
-                    UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
-                        "Failed to sign packet for peer ",
-                        UDPC_atostr((UDPC_HContext)this, iter->first.addr),
-                        ", port ",
-                        iter->second.port);
-                    continue;
-                }
-                std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
-#else
-                assert(!"libsodium disabled, invalid state");
-                UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
-                    "libsodium is disabled, cannot send packet");
-                continue;
-#endif
-            }
-
-            UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
-            destinationInfo.sin6_family = AF_INET6;
-            std::memcpy(
-                UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
-                UDPC_IPV6_ADDR_SUB(iter->first.addr),
-                16);
-            destinationInfo.sin6_port = htons(iter->second.port);
-            destinationInfo.sin6_flowinfo = 0;
-            destinationInfo.sin6_scope_id = iter->first.scope_id;
-            long int sentBytes = sendto(
-                socketHandle,
-                buf.get(),
-                sendSize,
-                0,
-                (struct sockaddr*) &destinationInfo,
-                sizeof(UDPC_IPV6_SOCKADDR_TYPE));
-            if(sentBytes != sendSize) {
-                UDPC_CHECK_LOG(this,
-                    UDPC_LoggingType::UDPC_ERROR,
-                    "Failed to send heartbeat packet to ",
-                    UDPC_atostr((UDPC_HContext)this, iter->first.addr),
-                    ", port = ",
-                    iter->second.port);
-                continue;
-            }
-
-            UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
-            pInfo.flags = 0x4;
-            pInfo.sender.addr = in6addr_loopback;
-            pInfo.receiver.addr = iter->first.addr;
-            pInfo.sender.port = ntohs(socketInfo.sin6_port);
-            pInfo.receiver.port = iter->second.port;
-            *((uint32_t*)(pInfo.data + 8)) = htonl(iter->second.lseq - 1);
-            pInfo.data[UDPC_MIN_HEADER_SIZE] = flags.test(2) && iter->second.flags.test(6) ? 1 : 0;
-
-            iter->second.sentPkts.push_back(std::move(pInfo));
-            iter->second.cleanupSentPkts();
-
-            // store other pkt info
-            UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
-            sentPktInfo->id = iter->second.lseq - 1;
-            iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
-        } else {
-            // sendPkts or priorityPkts not empty
-            UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
-            bool isResending = false;
-            if(!iter->second.priorityPkts.empty()) {
-                pInfo = iter->second.priorityPkts.front();
-                iter->second.priorityPkts.pop_front();
-                isResending = true;
-            } else {
-                pInfo = iter->second.sendPkts.front();
-                iter->second.sendPkts.pop_front();
-            }
-
-            std::unique_ptr<char[]> buf;
-            unsigned int sendSize = 0;
-            if(flags.test(2) && iter->second.flags.test(6)) {
-                sendSize = UDPC_LSFULL_HEADER_SIZE + pInfo.dataSize;
-                buf = std::unique_ptr<char[]>(new char[sendSize]);
-                *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
-            } else {
-                sendSize = UDPC_NSFULL_HEADER_SIZE + pInfo.dataSize;
-                buf = std::unique_ptr<char[]>(new char[sendSize]);
-                *((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
-            }
-
-            UDPC::preparePacket(
-                buf.get(),
-                protocolID,
-                iter->second.id,
-                iter->second.rseq,
-                iter->second.ack,
-                &iter->second.lseq,
-                (pInfo.flags & 0x4) | (isResending ? 0x8 : 0));
 
-            if(flags.test(2) && iter->second.flags.test(6)) {
-#ifdef UDPC_LIBSODIUM_ENABLED
-                unsigned char sig[crypto_sign_BYTES];
-                std::memset(buf.get() + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
-                std::memcpy(buf.get() + UDPC_LSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize);
-                if(crypto_sign_detached(
-                    sig, nullptr,
-                    (unsigned char*)buf.get(), sendSize,
-                    iter->second.sk) != 0) {
-                    UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
-                        "Failed to sign packet for peer ",
-                        UDPC_atostr((UDPC_HContext)this, iter->first.addr),
-                        ", port ",
-                        iter->second.port);
-                    continue;
+                if((pInfo.flags & 0x4) == 0) {
+                    // is check-received, store data in case packet gets lost
+                    UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo();
+                    std::memcpy(sentPInfo.data, buf.get(), sendSize);
+                    sentPInfo.flags = 0;
+                    sentPInfo.dataSize = sendSize;
+                    sentPInfo.sender.addr = in6addr_loopback;
+                    sentPInfo.receiver.addr = iter->first.addr;
+                    sentPInfo.sender.port = ntohs(socketInfo.sin6_port);
+                    sentPInfo.receiver.port = iter->second.port;
+
+                    iter->second.sentPkts.push_back(std::move(sentPInfo));
+                    iter->second.cleanupSentPkts();
+                } else {
+                    // is not check-received, only id stored in data array
+                    UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo();
+                    sentPInfo.flags = 0x4;
+                    sentPInfo.dataSize = 0;
+                    sentPInfo.sender.addr = in6addr_loopback;
+                    sentPInfo.receiver.addr = iter->first.addr;
+                    sentPInfo.sender.port = ntohs(socketInfo.sin6_port);
+                    sentPInfo.receiver.port = iter->second.port;
+                    *((uint32_t*)(sentPInfo.data + 8)) = htonl(iter->second.lseq - 1);
+
+                    iter->second.sentPkts.push_back(std::move(sentPInfo));
+                    iter->second.cleanupSentPkts();
                 }
-                std::memcpy(buf.get() + UDPC_MIN_HEADER_SIZE + 1, sig, crypto_sign_BYTES);
-#else
-                assert(!"libsodium disabled, invalid state");
-                UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
-                    "libsodium is disabled, cannot send packet");
-                continue;
-#endif
-            } else {
-                std::memcpy(buf.get() + UDPC_NSFULL_HEADER_SIZE, pInfo.data, pInfo.dataSize);
-            }
-
 
-            UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
-            destinationInfo.sin6_family = AF_INET6;
-            std::memcpy(
-                UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
-                UDPC_IPV6_ADDR_SUB(iter->first.addr),
-                16);
-            destinationInfo.sin6_port = htons(iter->second.port);
-            destinationInfo.sin6_flowinfo = 0;
-            destinationInfo.sin6_scope_id = iter->first.scope_id;
-            long int sentBytes = sendto(
-                socketHandle,
-                buf.get(),
-                sendSize,
-                0,
-                (struct sockaddr*) &destinationInfo,
-                sizeof(UDPC_IPV6_SOCKADDR_TYPE));
-            if(sentBytes != sendSize) {
-                UDPC_CHECK_LOG(this,
-                    UDPC_LoggingType::UDPC_ERROR,
-                    "Failed to send packet to ",
-                    UDPC_atostr((UDPC_HContext)this, iter->first.addr),
-                    ", port = ",
-                    iter->second.port);
-                continue;
-            }
-
-            if((pInfo.flags & 0x4) == 0) {
-                // is check-received, store data in case packet gets lost
-                UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo();
-                std::memcpy(sentPInfo.data, buf.get(), sendSize);
-                sentPInfo.flags = 0;
-                sentPInfo.dataSize = sendSize;
-                sentPInfo.sender.addr = in6addr_loopback;
-                sentPInfo.receiver.addr = iter->first.addr;
-                sentPInfo.sender.port = ntohs(socketInfo.sin6_port);
-                sentPInfo.receiver.port = iter->second.port;
-
-                iter->second.sentPkts.push_back(std::move(sentPInfo));
-                iter->second.cleanupSentPkts();
-            } else {
-                // is not check-received, only id stored in data array
-                UDPC_PacketInfo sentPInfo = UDPC::get_empty_pinfo();
-                sentPInfo.flags = 0x4;
-                sentPInfo.dataSize = 0;
-                sentPInfo.sender.addr = in6addr_loopback;
-                sentPInfo.receiver.addr = iter->first.addr;
-                sentPInfo.sender.port = ntohs(socketInfo.sin6_port);
-                sentPInfo.receiver.port = iter->second.port;
-                *((uint32_t*)(sentPInfo.data + 8)) = htonl(iter->second.lseq - 1);
-
-                iter->second.sentPkts.push_back(std::move(sentPInfo));
-                iter->second.cleanupSentPkts();
+                // store other pkt info
+                UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
+                sentPktInfo->id = iter->second.lseq - 1;
+                iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
             }
-
-            // store other pkt info
-            UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
-            sentPktInfo->id = iter->second.lseq - 1;
-            iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
+            iter->second.sent = now;
         }
-        iter->second.sent = now;
     }
 
     // remove queued for deletion
     for(auto delIter = deletionMap.begin(); delIter != deletionMap.end(); ++delIter) {
+        std::lock_guard<std::mutex> lock(conMapMutex);
         auto iter = conMap.find(*delIter);
         if(iter != conMap.end()) {
             if(iter->second.flags.test(4)) {
@@ -1166,6 +1177,7 @@ void UDPC::Context::update_impl() {
 
     if(isConnect && !isPing) {
         // is connect packet and is accepting new connections
+        std::lock_guard<std::mutex> lock(conMapMutex);
         if(!flags.test(1)
                 && conMap.find(identifier) == conMap.end()
                 && isAcceptNewConnections.load()) {
@@ -1397,6 +1409,7 @@ void UDPC::Context::update_impl() {
         return;
     }
 
+    std::lock_guard<std::mutex> lock(conMapMutex);
     auto iter = conMap.find(identifier);
     if(iter == conMap.end() || iter->second.flags.test(3)
             || !iter->second.flags.test(4) || iter->second.id != conID) {
@@ -1780,10 +1793,7 @@ void UDPC::threadedUpdate(Context *ctx) {
     decltype(now) nextNow;
     while(ctx->threadRunning.load()) {
         now = std::chrono::steady_clock::now();
-        {
-            std::lock_guard<std::mutex> lock(ctx->conMapMutex);
-            ctx->update_impl();
-        }
+        ctx->update_impl();
         nextNow = std::chrono::steady_clock::now();
         std::this_thread::sleep_for(ctx->threadedSleepTime - (nextNow - now));
     }
@@ -2066,7 +2076,6 @@ void UDPC_update(UDPC_HContext ctx) {
         return;
     }
 
-    std::lock_guard<std::mutex> lock(c->conMapMutex);
     c->update_impl();
 }