]> git.seodisparate.com - UDPConnection/commitdiff
Add events, refactorings
authorStephen Seo <seo.disparate@gmail.com>
Mon, 11 Nov 2019 07:08:51 +0000 (16:08 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Mon, 11 Nov 2019 07:08:51 +0000 (16:08 +0900)
Added event system to lessen the use of the main mutex and instead use
thread safe data structures (TSLQueue). Also can enable and check events
during execution (connect, disconnect, good mode, bad mode).

Fixes and refactorings.

src/UDPC_Defines.hpp
src/UDPConnection.cpp
src/UDPConnection.h
src/test/UDPC_NetworkTest.cpp

index a60fec0b802703371a5df954b7b6a13779222277..785d9d59ec066520ad20e5f2e8f4c3cae1a9ea0e 100644 (file)
@@ -97,7 +97,7 @@ struct ConnectionData {
      * 4 - is id set
      * 5 - error initializing keys for public key encryption
      */
-    std::bitset<32> flags;
+    std::bitset<8> flags;
     uint32_t id;
     uint32_t lseq;
     uint32_t rseq;
@@ -183,8 +183,9 @@ public:
      * 0 - is threaded
      * 1 - is client
      */
-    std::bitset<32> flags;
+    std::bitset<8> flags;
     std::atomic_bool isAcceptNewConnections;
+    std::atomic_bool isReceivingEvents;
     std::atomic_uint32_t protocolID;
     std::atomic_uint_fast8_t loggingType;
     std::atomic_uint32_t atostrBufIndex;
@@ -202,6 +203,10 @@ public:
     std::unordered_map<uint32_t, UDPC_ConnectionId> idMap;
     TSLQueue<UDPC_PacketInfo> receivedPkts;
     TSLQueue<UDPC_PacketInfo> cSendPkts;
+    // handled internally
+    TSLQueue<UDPC_Event> internalEvents;
+    // handled via interface, if isReceivingEvents is true
+    TSLQueue<UDPC_Event> externalEvents;
 
     std::default_random_engine rng_engine;
 
index 481d8e93fe46509c20f2dbfd149c9d18242b7a6b..767431c1e0c738c573d375cd607d2e3164a78755 100644 (file)
@@ -163,6 +163,7 @@ UDPC::Context::Context(bool isThreaded) :
 _contextIdentifier(UDPC_CONTEXT_IDENTIFIER),
 flags(),
 isAcceptNewConnections(true),
+isReceivingEvents(false),
 protocolID(UDPC_DEFAULT_PROTOCOL_ID),
 #ifndef NDEBUG
 loggingType(UDPC_DEBUG),
@@ -225,6 +226,113 @@ void UDPC::Context::update_impl() {
     std::chrono::steady_clock::duration temp_dt_fs;
     lastUpdated = now;
 
+    // handle internalEvents
+    do {
+        auto optE = internalEvents.top_and_pop();
+        if(optE.has_value()) {
+            switch(optE.value().type) {
+            case UDPC_ET_REQUEST_CONNECT:
+            {
+                UDPC::ConnectionData newCon(
+                    false,
+                    this,
+                    optE.value().conId.addr,
+                    optE.value().conId.scope_id,
+                    optE.value().conId.port);
+                if(newCon.flags.test(5)) {
+                    UDPC_CHECK_LOG(this,
+                        UDPC_LoggingType::UDPC_ERROR,
+                        "Failed to init ConnectionData instance (libsodium "
+                        "init fail) while client establishing connection with ",
+                        UDPC_atostr((UDPC_HContext)this, optE.value().conId.addr),
+                        " port ",
+                        optE.value().conId.port);
+                    continue;
+                }
+                newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT;
+
+                if(conMap.find(optE.value().conId) == conMap.end()) {
+                    conMap.insert(std::make_pair(
+                        optE.value().conId,
+                        std::move(newCon)));
+                    auto addrConIter = addrConMap.find(optE.value().conId.addr);
+                    if(addrConIter == addrConMap.end()) {
+                        auto insertResult = addrConMap.insert(std::make_pair(
+                            optE.value().conId.addr,
+                            std::unordered_set<UDPC_ConnectionId, UDPC::ConnectionIdHasher>{}));
+                        assert(insertResult.second &&
+                            "new connection insert into addrConMap must not fail");
+                        addrConIter = insertResult.first;
+                    }
+                    addrConIter->second.insert(optE.value().conId);
+                    UDPC_CHECK_LOG(this,
+                        UDPC_LoggingType::UDPC_INFO,
+                        "Client initiating connection to ",
+                        UDPC_atostr((UDPC_HContext)this, optE.value().conId.addr),
+                        " port ",
+                        optE.value().conId.port,
+                        " ...");
+                } else {
+                    UDPC_CHECK_LOG(this,
+                        UDPC_LoggingType::UDPC_WARNING,
+                        "Client initiate connection, already connected to peer ",
+                        UDPC_atostr((UDPC_HContext)this, optE.value().conId.addr),
+                        " port ",
+                        optE.value().conId.port);
+                }
+            }
+                break;
+            case UDPC_ET_REQUEST_DISCONNECT:
+                if(optE.value().dropAllWithAddr != 0) {
+                    // drop all connections with same address
+                    auto addrConIter = addrConMap.find(optE.value().conId.addr);
+                    if(addrConIter != addrConMap.end()) {
+                        for(auto identIter = addrConIter->second.begin();
+                                identIter != addrConIter->second.end();
+                                ++identIter) {
+                            auto conIter = conMap.find(*identIter);
+                            assert(conIter != conMap.end() &&
+                                "conMap must have connection listed in addrConMap");
+                            if(conIter->second.flags.test(4)) {
+                                idMap.erase(conIter->second.id);
+                            }
+                            if(isReceivingEvents.load()) {
+                                externalEvents.push(UDPC_Event{
+                                    UDPC_ET_DISCONNECTED, conIter->first, false});
+                            }
+                            conMap.erase(conIter);
+                        }
+                        addrConMap.erase(addrConIter);
+                    }
+                } else {
+                    // drop only specific connection with addr and port
+                    auto iter = conMap.find(optE.value().conId);
+                    if(iter != conMap.end()) {
+                        if(iter->second.flags.test(4)) {
+                            idMap.erase(iter->second.id);
+                        }
+                        auto addrConIter = addrConMap.find(optE.value().conId.addr);
+                        if(addrConIter != addrConMap.end()) {
+                            addrConIter->second.erase(optE.value().conId);
+                            if(addrConIter->second.empty()) {
+                                addrConMap.erase(addrConIter);
+                            }
+                        }
+                        if(isReceivingEvents.load()) {
+                            externalEvents.push(UDPC_Event{
+                                UDPC_ET_DISCONNECTED, iter->first, false});
+                        }
+                        conMap.erase(iter);
+                    }
+                }
+                break;
+            default:
+                assert(!"internalEvents got invalid type");
+                break;
+            }
+        }
+    } while(!internalEvents.empty());
+
     {
         // check timed out, check good/bad mode with rtt, remove timed out
         std::vector<UDPC_ConnectionId> removed;
@@ -257,6 +365,10 @@ void UDPC::Context::update_impl() {
                     iter->second.toggleT *= 2;
                 }
                 iter->second.toggledTimer = std::chrono::steady_clock::duration::zero();
+                if(isReceivingEvents.load()) {
+                    externalEvents.push(UDPC_Event{
+                        UDPC_ET_BAD_MODE, iter->first, false});
+                }
             } else if(iter->second.flags.test(1)) {
                 // good mode, good rtt
                 if(iter->second.toggleTimer >= UDPC::TEN_SECONDS) {
@@ -279,6 +391,10 @@ void UDPC::Context::update_impl() {
                         ", port = ",
                         iter->second.port);
                     iter->second.flags.set(1);
+                    if(isReceivingEvents.load()) {
+                        externalEvents.push(UDPC_Event{
+                            UDPC_ET_GOOD_MODE, iter->first, false});
+                    }
                 }
             } else {
                 // bad mode, bad rtt
@@ -317,6 +433,10 @@ void UDPC::Context::update_impl() {
             if(cIter->second.flags.test(4)) {
                 idMap.erase(cIter->second.id);
             }
+            if(isReceivingEvents.load()) {
+                externalEvents.push(UDPC_Event{
+                    UDPC_ET_DISCONNECTED, *iter, false});
+            }
 
             conMap.erase(cIter);
         }
@@ -325,6 +445,7 @@ void UDPC::Context::update_impl() {
     // move queued in cSendPkts to existing connection's sendPkts
     {
         auto sendIter = cSendPkts.begin();
+        std::unordered_set<UDPC_ConnectionId, UDPC::ConnectionIdHasher> dropped;
         while(true) {
             auto next = sendIter.current();
             if(next) {
@@ -352,15 +473,18 @@ void UDPC::Context::update_impl() {
                         break;
                     }
                 } else {
-                    UDPC_CHECK_LOG(this,
-                        UDPC_LoggingType::UDPC_WARNING,
-                        "Dropped queued packet to ",
-                        UDPC_atostr(
-                            (UDPC_HContext)this,
-                            next.value().receiver.addr),
-                        ", port = ",
-                        next.value().receiver.port,
-                        " due to connection not existing");
+                    if(dropped.find(next.value().receiver) == dropped.end()) {
+                        UDPC_CHECK_LOG(this,
+                            UDPC_LoggingType::UDPC_WARNING,
+                            "Dropped queued packets to ",
+                            UDPC_atostr(
+                                (UDPC_HContext)this,
+                                next.value().receiver.addr),
+                            ", port = ",
+                            next.value().receiver.port,
+                            " due to connection not existing");
+                        dropped.insert(next.value().receiver);
+                    }
                     if(sendIter.remove()) {
                         continue;
                     } else {
@@ -749,7 +873,12 @@ void UDPC::Context::update_impl() {
                 addrConIter = insertResult.first;
             }
             addrConIter->second.insert(identifier);
-            // TODO trigger event server established connection with client
+            if(isReceivingEvents.load()) {
+                externalEvents.push(UDPC_Event{
+                    UDPC_ET_CONNECTED,
+                    identifier,
+                    false});
+            }
         } else if (flags.test(1)) {
             // is client
             auto iter = conMap.find(identifier);
@@ -768,7 +897,12 @@ void UDPC::Context::update_impl() {
                 ", port = ",
                 ntohs(receivedData.sin6_port),
                 ", got id = ", conID);
-            // TODO trigger event client established connection with server
+            if(isReceivingEvents.load()) {
+                externalEvents.push(UDPC_Event{
+                    UDPC_ET_CONNECTED,
+                    identifier,
+                    false});
+            }
         }
         return;
     }
@@ -1227,41 +1361,7 @@ void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connec
         return;
     }
 
-    UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "client_initiate_connection: Got peer a = ",
-        UDPC_atostr((UDPC_HContext)ctx, connectionId.addr),
-        ", p = ", connectionId.port);
-
-    std::lock_guard<std::mutex> lock(c->mutex);
-
-    UDPC::ConnectionData newCon(false, c, connectionId.addr, connectionId.scope_id, connectionId.port);
-    if(newCon.flags.test(5)) {
-        UDPC_CHECK_LOG(c,
-            UDPC_LoggingType::UDPC_ERROR,
-            "Failed to init ConnectionData instance (libsodium init "
-            "fail) while client establishing connection with ",
-            UDPC_atostr((UDPC_HContext)c, connectionId.addr),
-            ", port = ",
-            connectionId.port);
-        return;
-    }
-    newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT;
-
-    if(c->conMap.find(connectionId) == c->conMap.end()) {
-        c->conMap.insert(std::make_pair(connectionId, std::move(newCon)));
-        auto addrConIter = c->addrConMap.find(connectionId.addr);
-        if(addrConIter == c->addrConMap.end()) {
-            auto insertResult = c->addrConMap.insert(std::make_pair(
-                    connectionId.addr,
-                    std::unordered_set<UDPC_ConnectionId, UDPC::ConnectionIdHasher>{}
-                ));
-            assert(insertResult.second);
-            addrConIter = insertResult.first;
-        }
-        addrConIter->second.insert(connectionId);
-        UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "client_initiate_connection: Initiating connection...");
-    } else {
-        UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_ERROR, "client_initiate_connection: Already connected to peer");
-    }
+    c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_CONNECT, connectionId, false});
 }
 
 void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
@@ -1304,49 +1404,14 @@ int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting) {
     return c->isAcceptNewConnections.exchange(isAccepting == 0 ? false : true);
 }
 
-int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool dropAllWithAddr) {
+void UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, int dropAllWithAddr) {
     UDPC::Context *c = UDPC::verifyContext(ctx);
     if(!c) {
-        return 0;
-    }
-
-    std::lock_guard<std::mutex> lock(c->mutex);
-
-    if(dropAllWithAddr) {
-        auto addrConIter = c->addrConMap.find(connectionId.addr);
-        if(addrConIter != c->addrConMap.end()) {
-            for(auto identIter = addrConIter->second.begin();
-                    identIter != addrConIter->second.end();
-                    ++identIter) {
-                auto conIter = c->conMap.find(*identIter);
-                assert(conIter != c->conMap.end());
-                if(conIter->second.flags.test(4)) {
-                    c->idMap.erase(conIter->second.id);
-                }
-                c->conMap.erase(conIter);
-            }
-            c->addrConMap.erase(addrConIter);
-            return 1;
-        }
-    } else {
-        auto iter = c->conMap.find(connectionId);
-        if(iter != c->conMap.end()) {
-            if(iter->second.flags.test(4)) {
-                c->idMap.erase(iter->second.id);
-            }
-            auto addrConIter = c->addrConMap.find(connectionId.addr);
-            if(addrConIter != c->addrConMap.end()) {
-                addrConIter->second.erase(connectionId);
-                if(addrConIter->second.empty()) {
-                    c->addrConMap.erase(addrConIter);
-                }
-            }
-            c->conMap.erase(iter);
-            return 1;
-        }
+        return;
     }
 
-    return 0;
+    c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_DISCONNECT, connectionId, dropAllWithAddr});
+    return;
 }
 
 int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
@@ -1390,15 +1455,32 @@ void UDPC_free_list_connected(UDPC_ConnectionId *list) {
     std::free(list);
 }
 
+uint32_t UDPC_get_protocol_id(UDPC_HContext ctx) {
+    UDPC::Context *c = UDPC::verifyContext(ctx);
+    if(!c) {
+        return 0;
+    }
+
+    return c->protocolID.load();
+}
+
 uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id) {
     UDPC::Context *c = UDPC::verifyContext(ctx);
     if(!c) {
         return 0;
     }
-    std::lock_guard<std::mutex> lock(c->mutex);
+
     return c->protocolID.exchange(id);
 }
 
+UDPC_LoggingType UDPC_get_logging_type(UDPC_HContext ctx) {
+    UDPC::Context *c = UDPC::verifyContext(ctx);
+    if(!c) {
+        return UDPC_LoggingType::UDPC_SILENT;
+    }
+    return static_cast<UDPC_LoggingType>(c->loggingType.load());
+}
+
 UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType) {
     UDPC::Context *c = UDPC::verifyContext(ctx);
     if(!c) {
@@ -1407,6 +1489,38 @@ UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggi
     return static_cast<UDPC_LoggingType>(c->loggingType.exchange(loggingType));
 }
 
+int UDPC_get_receiving_events(UDPC_HContext ctx) {
+    UDPC::Context *c = UDPC::verifyContext(ctx);
+    if(!c) {
+        return 0;
+    }
+
+    return c->isReceivingEvents.load() ? 1 : 0;
+}
+
+int UDPC_set_receiving_events(UDPC_HContext ctx, int isReceivingEvents) {
+    UDPC::Context *c = UDPC::verifyContext(ctx);
+    if(!c) {
+        return 0;
+    }
+
+    return c->isReceivingEvents.exchange(isReceivingEvents != 0);
+}
+
+UDPC_Event UDPC_get_event(UDPC_HContext ctx, unsigned long *remaining) {
+    UDPC::Context *c = UDPC::verifyContext(ctx);
+    if(!c) {
+        return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0};
+    }
+
+    auto optE = c->externalEvents.top_and_pop_and_rsize(remaining);
+    if(optE) {
+        return optE.value();
+    } else {
+        return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0};
+    }
+}
+
 UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned long *remaining) {
     UDPC::Context *c = UDPC::verifyContext(ctx);
     if(!c) {
index c5c7bae72ec71009e29dd8afb11170d1fec0c553..28f35360312567cca504e01d5bef0f7a4839a090 100644 (file)
@@ -88,6 +88,22 @@ typedef struct {
     UDPC_ConnectionId receiver;
 } UDPC_PacketInfo;
 
+typedef enum {
+    UDPC_ET_NONE,
+    UDPC_ET_REQUEST_CONNECT,
+    UDPC_ET_REQUEST_DISCONNECT,
+    UDPC_ET_CONNECTED,
+    UDPC_ET_DISCONNECTED,
+    UDPC_ET_GOOD_MODE,
+    UDPC_ET_BAD_MODE
+} UDPC_EventType;
+
+typedef struct {
+    UDPC_EventType type;
+    UDPC_ConnectionId conId;
+    int dropAllWithAddr;
+} UDPC_Event;
+
 /// port should be in native byte order (not network/big-endian)
 UDPC_ConnectionId UDPC_create_id(UDPC_IPV6_ADDR_TYPE addr, uint16_t port);
 
@@ -112,7 +128,7 @@ unsigned long UDPC_get_queue_send_current_size(UDPC_HContext ctx);
 
 int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting);
 
-int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool dropAllWithAddr);
+void UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, int dropAllWithAddr);
 
 int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
 
@@ -120,10 +136,20 @@ UDPC_ConnectionId* UDPC_get_list_connected(UDPC_HContext ctx, unsigned int *size
 
 void UDPC_free_list_connected(UDPC_ConnectionId *list);
 
+uint32_t UDPC_get_protocol_id(UDPC_HContext ctx);
+
 uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id);
 
+UDPC_LoggingType UDPC_get_logging_type(UDPC_HContext ctx);
+
 UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType);
 
+int UPDC_get_receiving_events(UDPC_HContext ctx);
+
+int UDPC_set_receiving_events(UDPC_HContext ctx, int isReceivingEvents);
+
+UDPC_Event UDPC_get_event(UDPC_HContext ctx, unsigned long *remaining);
+
 UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned long *remaining);
 
 const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
index 876cf525cf7652351da0fe898262160eb3494e74..049311a3220126875150dd31b0e807fe95178505 100644 (file)
@@ -21,6 +21,7 @@ void usage() {
     puts("-t <tick_count>");
     puts("-n - do not add payload to packets");
     puts("-l (silent|error|warning|info|verbose|debug) - log level, default debug");
+    puts("-e - enable receiving events");
 }
 
 int main(int argc, char **argv) {
@@ -38,6 +39,7 @@ int main(int argc, char **argv) {
     unsigned int tickLimit = 15;
     bool noPayload = false;
     UDPC_LoggingType logLevel = UDPC_LoggingType::UDPC_DEBUG;
+    bool isReceivingEvents = false;
     while(argc > 0) {
         if(std::strcmp(argv[0], "-c") == 0) {
             isClient = true;
@@ -82,6 +84,9 @@ int main(int argc, char **argv) {
                 usage();
                 return 1;
             }
+        } else if(std::strcmp(argv[0], "-e") == 0) {
+            isReceivingEvents = true;
+            puts("Enabled isReceivingEvents");
         } else {
             printf("ERROR: invalid argument \"%s\"\n", argv[0]);
             usage();
@@ -131,6 +136,7 @@ int main(int argc, char **argv) {
         return 1;
     }
     UDPC_set_logging_type(context, logLevel);
+    UDPC_set_receiving_events(context, isReceivingEvents ? 1 : 0);
     unsigned int tick = 0;
     unsigned int temp = 0;
     unsigned int temp2, temp3;
@@ -138,6 +144,7 @@ int main(int argc, char **argv) {
     UDPC_ConnectionId *list = nullptr;
     std::vector<unsigned int> sendIds;
     UDPC_PacketInfo received;
+    UDPC_Event event;
     while(true) {
         std::this_thread::sleep_for(std::chrono::seconds(1));
         if(isClient && UDPC_has_connection(context, connectionId) == 0) {
@@ -169,6 +176,34 @@ int main(int argc, char **argv) {
                 }
             } while (size > 0);
         }
+        do {
+            event = UDPC_get_event(context, &size);
+            if(event.type == UDPC_ET_NONE) {
+                break;
+            }
+            const char *typeString;
+            switch(event.type) {
+            case UDPC_ET_CONNECTED:
+                typeString = "CONNECTED";
+                break;
+            case UDPC_ET_DISCONNECTED:
+                typeString = "DISCONNECTED";
+                break;
+            case UDPC_ET_GOOD_MODE:
+                typeString = "GOOD_MODE";
+                break;
+            case UDPC_ET_BAD_MODE:
+                typeString = "BAD_MODE";
+                break;
+            default:
+                typeString = "INVALID_TYPE";
+                break;
+            }
+            printf("Got event %s: %s %u\n",
+                typeString,
+                UDPC_atostr(context, event.conId.addr),
+                event.conId.port);
+        } while(size > 0);
         if(tick++ > tickLimit) {
             break;
         }