From 7b5cf3b6f82a2a09f20e013798925f85f7906820 Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Mon, 11 Nov 2019 16:08:51 +0900 Subject: [PATCH] Add events, refactorings Added event system to lessen the use of the main mutex and instead use thread safe data structures (TSLQueue). Also can enable and check events during execution (connect, disconnect, good mode, bad mode). Fixes and refactorings. --- src/UDPC_Defines.hpp | 9 +- src/UDPConnection.cpp | 286 ++++++++++++++++++++++++---------- src/UDPConnection.h | 28 +++- src/test/UDPC_NetworkTest.cpp | 35 +++++ 4 files changed, 269 insertions(+), 89 deletions(-) diff --git a/src/UDPC_Defines.hpp b/src/UDPC_Defines.hpp index a60fec0..785d9d5 100644 --- a/src/UDPC_Defines.hpp +++ b/src/UDPC_Defines.hpp @@ -97,7 +97,7 @@ struct ConnectionData { * 4 - is id set * 5 - error initializing keys for public key encryption */ - std::bitset<32> flags; + std::bitset<8> flags; uint32_t id; uint32_t lseq; uint32_t rseq; @@ -183,8 +183,9 @@ public: * 0 - is threaded * 1 - is client */ - std::bitset<32> flags; + std::bitset<8> flags; std::atomic_bool isAcceptNewConnections; + std::atomic_bool isReceivingEvents; std::atomic_uint32_t protocolID; std::atomic_uint_fast8_t loggingType; std::atomic_uint32_t atostrBufIndex; @@ -202,6 +203,10 @@ public: std::unordered_map idMap; TSLQueue receivedPkts; TSLQueue cSendPkts; + // handled internally + TSLQueue internalEvents; + // handled via interface, if isReceivingEvents is true + TSLQueue externalEvents; std::default_random_engine rng_engine; diff --git a/src/UDPConnection.cpp b/src/UDPConnection.cpp index 481d8e9..767431c 100644 --- a/src/UDPConnection.cpp +++ b/src/UDPConnection.cpp @@ -163,6 +163,7 @@ UDPC::Context::Context(bool isThreaded) : _contextIdentifier(UDPC_CONTEXT_IDENTIFIER), flags(), isAcceptNewConnections(true), +isReceivingEvents(false), protocolID(UDPC_DEFAULT_PROTOCOL_ID), #ifndef NDEBUG loggingType(UDPC_DEBUG), @@ -225,6 +226,113 @@ void UDPC::Context::update_impl() { std::chrono::steady_clock::duration temp_dt_fs; lastUpdated = now; + // handle internalEvents + do { + auto optE = internalEvents.top_and_pop(); + if(optE.has_value()) { + switch(optE.value().type) { + case UDPC_ET_REQUEST_CONNECT: + { + UDPC::ConnectionData newCon( + false, + this, + optE.value().conId.addr, + optE.value().conId.scope_id, + optE.value().conId.port); + if(newCon.flags.test(5)) { + UDPC_CHECK_LOG(this, + UDPC_LoggingType::UDPC_ERROR, + "Failed to init ConnectionData instance (libsodium " + "init fail) while client establishing connection with ", + UDPC_atostr((UDPC_HContext)this, optE.value().conId.addr), + " port ", + optE.value().conId.port); + continue; + } + newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT; + + if(conMap.find(optE.value().conId) == conMap.end()) { + conMap.insert(std::make_pair( + optE.value().conId, + std::move(newCon))); + auto addrConIter = addrConMap.find(optE.value().conId.addr); + if(addrConIter == addrConMap.end()) { + auto insertResult = addrConMap.insert(std::make_pair( + optE.value().conId.addr, + std::unordered_set{})); + assert(insertResult.second && + "new connection insert into addrConMap must not fail"); + addrConIter = insertResult.first; + } + addrConIter->second.insert(optE.value().conId); + UDPC_CHECK_LOG(this, + UDPC_LoggingType::UDPC_INFO, + "Client initiating connection to ", + UDPC_atostr((UDPC_HContext)this, optE.value().conId.addr), + " port ", + optE.value().conId.port, + " ..."); + } else { + UDPC_CHECK_LOG(this, + UDPC_LoggingType::UDPC_WARNING, + "Client initiate connection, already connected to peer ", + UDPC_atostr((UDPC_HContext)this, optE.value().conId.addr), + " port ", + optE.value().conId.port); + } + } + break; + case UDPC_ET_REQUEST_DISCONNECT: + if(optE.value().dropAllWithAddr != 0) { + // drop all connections with same address + auto addrConIter = addrConMap.find(optE.value().conId.addr); + if(addrConIter != addrConMap.end()) { + for(auto identIter = addrConIter->second.begin(); + identIter != addrConIter->second.end(); + ++identIter) { + auto conIter = conMap.find(*identIter); + assert(conIter != conMap.end() && + "conMap must have connection listed in addrConMap"); + if(conIter->second.flags.test(4)) { + idMap.erase(conIter->second.id); + } + if(isReceivingEvents.load()) { + externalEvents.push(UDPC_Event{ + UDPC_ET_DISCONNECTED, conIter->first, false}); + } + conMap.erase(conIter); + } + addrConMap.erase(addrConIter); + } + } else { + // drop only specific connection with addr and port + auto iter = conMap.find(optE.value().conId); + if(iter != conMap.end()) { + if(iter->second.flags.test(4)) { + idMap.erase(iter->second.id); + } + auto addrConIter = addrConMap.find(optE.value().conId.addr); + if(addrConIter != addrConMap.end()) { + addrConIter->second.erase(optE.value().conId); + if(addrConIter->second.empty()) { + addrConMap.erase(addrConIter); + } + } + if(isReceivingEvents.load()) { + externalEvents.push(UDPC_Event{ + UDPC_ET_DISCONNECTED, iter->first, false}); + } + conMap.erase(iter); + } + } + break; + default: + assert(!"internalEvents got invalid type"); + break; + } + } + } while(!internalEvents.empty()); + { // check timed out, check good/bad mode with rtt, remove timed out std::vector removed; @@ -257,6 +365,10 @@ void UDPC::Context::update_impl() { iter->second.toggleT *= 2; } iter->second.toggledTimer = std::chrono::steady_clock::duration::zero(); + if(isReceivingEvents.load()) { + externalEvents.push(UDPC_Event{ + UDPC_ET_BAD_MODE, iter->first, false}); + } } else if(iter->second.flags.test(1)) { // good mode, good rtt if(iter->second.toggleTimer >= UDPC::TEN_SECONDS) { @@ -279,6 +391,10 @@ void UDPC::Context::update_impl() { ", port = ", iter->second.port); iter->second.flags.set(1); + if(isReceivingEvents.load()) { + externalEvents.push(UDPC_Event{ + UDPC_ET_GOOD_MODE, iter->first, false}); + } } } else { // bad mode, bad rtt @@ -317,6 +433,10 @@ void UDPC::Context::update_impl() { if(cIter->second.flags.test(4)) { idMap.erase(cIter->second.id); } + if(isReceivingEvents.load()) { + externalEvents.push(UDPC_Event{ + UDPC_ET_DISCONNECTED, *iter, false}); + } conMap.erase(cIter); } @@ -325,6 +445,7 @@ void UDPC::Context::update_impl() { // move queued in cSendPkts to existing connection's sendPkts { auto sendIter = cSendPkts.begin(); + std::unordered_set dropped; while(true) { auto next = sendIter.current(); if(next) { @@ -352,15 +473,18 @@ void UDPC::Context::update_impl() { break; } } else { - UDPC_CHECK_LOG(this, - UDPC_LoggingType::UDPC_WARNING, - "Dropped queued packet to ", - UDPC_atostr( - (UDPC_HContext)this, - next.value().receiver.addr), - ", port = ", - next.value().receiver.port, - " due to connection not existing"); + if(dropped.find(next.value().receiver) == dropped.end()) { + UDPC_CHECK_LOG(this, + UDPC_LoggingType::UDPC_WARNING, + "Dropped queued packets to ", + UDPC_atostr( + (UDPC_HContext)this, + next.value().receiver.addr), + ", port = ", + next.value().receiver.port, + " due to connection not existing"); + dropped.insert(next.value().receiver); + } if(sendIter.remove()) { continue; } else { @@ -749,7 +873,12 @@ void UDPC::Context::update_impl() { addrConIter = insertResult.first; } addrConIter->second.insert(identifier); - // TODO trigger event server established connection with client + if(isReceivingEvents.load()) { + externalEvents.push(UDPC_Event{ + UDPC_ET_CONNECTED, + identifier, + false}); + } } else if (flags.test(1)) { // is client auto iter = conMap.find(identifier); @@ -768,7 +897,12 @@ void UDPC::Context::update_impl() { ", port = ", ntohs(receivedData.sin6_port), ", got id = ", conID); - // TODO trigger event client established connection with server + if(isReceivingEvents.load()) { + externalEvents.push(UDPC_Event{ + UDPC_ET_CONNECTED, + identifier, + false}); + } } return; } @@ -1227,41 +1361,7 @@ void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connec return; } - UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "client_initiate_connection: Got peer a = ", - UDPC_atostr((UDPC_HContext)ctx, connectionId.addr), - ", p = ", connectionId.port); - - std::lock_guard lock(c->mutex); - - UDPC::ConnectionData newCon(false, c, connectionId.addr, connectionId.scope_id, connectionId.port); - if(newCon.flags.test(5)) { - UDPC_CHECK_LOG(c, - UDPC_LoggingType::UDPC_ERROR, - "Failed to init ConnectionData instance (libsodium init " - "fail) while client establishing connection with ", - UDPC_atostr((UDPC_HContext)c, connectionId.addr), - ", port = ", - connectionId.port); - return; - } - newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT; - - if(c->conMap.find(connectionId) == c->conMap.end()) { - c->conMap.insert(std::make_pair(connectionId, std::move(newCon))); - auto addrConIter = c->addrConMap.find(connectionId.addr); - if(addrConIter == c->addrConMap.end()) { - auto insertResult = c->addrConMap.insert(std::make_pair( - connectionId.addr, - std::unordered_set{} - )); - assert(insertResult.second); - addrConIter = insertResult.first; - } - addrConIter->second.insert(connectionId); - UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "client_initiate_connection: Initiating connection..."); - } else { - UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_ERROR, "client_initiate_connection: Already connected to peer"); - } + c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_CONNECT, connectionId, false}); } void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId, @@ -1304,49 +1404,14 @@ int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting) { return c->isAcceptNewConnections.exchange(isAccepting == 0 ? false : true); } -int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool dropAllWithAddr) { +void UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, int dropAllWithAddr) { UDPC::Context *c = UDPC::verifyContext(ctx); if(!c) { - return 0; + return; } - std::lock_guard lock(c->mutex); - - if(dropAllWithAddr) { - auto addrConIter = c->addrConMap.find(connectionId.addr); - if(addrConIter != c->addrConMap.end()) { - for(auto identIter = addrConIter->second.begin(); - identIter != addrConIter->second.end(); - ++identIter) { - auto conIter = c->conMap.find(*identIter); - assert(conIter != c->conMap.end()); - if(conIter->second.flags.test(4)) { - c->idMap.erase(conIter->second.id); - } - c->conMap.erase(conIter); - } - c->addrConMap.erase(addrConIter); - return 1; - } - } else { - auto iter = c->conMap.find(connectionId); - if(iter != c->conMap.end()) { - if(iter->second.flags.test(4)) { - c->idMap.erase(iter->second.id); - } - auto addrConIter = c->addrConMap.find(connectionId.addr); - if(addrConIter != c->addrConMap.end()) { - addrConIter->second.erase(connectionId); - if(addrConIter->second.empty()) { - c->addrConMap.erase(addrConIter); - } - } - c->conMap.erase(iter); - return 1; - } - } - - return 0; + c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_DISCONNECT, connectionId, dropAllWithAddr}); + return; } int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId) { @@ -1390,15 +1455,32 @@ void UDPC_free_list_connected(UDPC_ConnectionId *list) { std::free(list); } +uint32_t UDPC_get_protocol_id(UDPC_HContext ctx) { + UDPC::Context *c = UDPC::verifyContext(ctx); + if(!c) { + return 0; + } + + return c->protocolID.load(); +} + uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id) { UDPC::Context *c = UDPC::verifyContext(ctx); if(!c) { return 0; } - std::lock_guard lock(c->mutex); + return c->protocolID.exchange(id); } +UDPC_LoggingType UDPC_get_logging_type(UDPC_HContext ctx) { + UDPC::Context *c = UDPC::verifyContext(ctx); + if(!c) { + return UDPC_LoggingType::UDPC_SILENT; + } + return static_cast(c->loggingType.load()); +} + UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType) { UDPC::Context *c = UDPC::verifyContext(ctx); if(!c) { @@ -1407,6 +1489,38 @@ UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggi return static_cast(c->loggingType.exchange(loggingType)); } +int UDPC_get_receiving_events(UDPC_HContext ctx) { + UDPC::Context *c = UDPC::verifyContext(ctx); + if(!c) { + return 0; + } + + return c->isReceivingEvents.load() ? 1 : 0; +} + +int UDPC_set_receiving_events(UDPC_HContext ctx, int isReceivingEvents) { + UDPC::Context *c = UDPC::verifyContext(ctx); + if(!c) { + return 0; + } + + return c->isReceivingEvents.exchange(isReceivingEvents != 0); +} + +UDPC_Event UDPC_get_event(UDPC_HContext ctx, unsigned long *remaining) { + UDPC::Context *c = UDPC::verifyContext(ctx); + if(!c) { + return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0}; + } + + auto optE = c->externalEvents.top_and_pop_and_rsize(remaining); + if(optE) { + return optE.value(); + } else { + return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0}; + } +} + UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned long *remaining) { UDPC::Context *c = UDPC::verifyContext(ctx); if(!c) { diff --git a/src/UDPConnection.h b/src/UDPConnection.h index c5c7bae..28f3536 100644 --- a/src/UDPConnection.h +++ b/src/UDPConnection.h @@ -88,6 +88,22 @@ typedef struct { UDPC_ConnectionId receiver; } UDPC_PacketInfo; +typedef enum { + UDPC_ET_NONE, + UDPC_ET_REQUEST_CONNECT, + UDPC_ET_REQUEST_DISCONNECT, + UDPC_ET_CONNECTED, + UDPC_ET_DISCONNECTED, + UDPC_ET_GOOD_MODE, + UDPC_ET_BAD_MODE +} UDPC_EventType; + +typedef struct { + UDPC_EventType type; + UDPC_ConnectionId conId; + int dropAllWithAddr; +} UDPC_Event; + /// port should be in native byte order (not network/big-endian) UDPC_ConnectionId UDPC_create_id(UDPC_IPV6_ADDR_TYPE addr, uint16_t port); @@ -112,7 +128,7 @@ unsigned long UDPC_get_queue_send_current_size(UDPC_HContext ctx); int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting); -int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool dropAllWithAddr); +void UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, int dropAllWithAddr); int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId); @@ -120,10 +136,20 @@ UDPC_ConnectionId* UDPC_get_list_connected(UDPC_HContext ctx, unsigned int *size void UDPC_free_list_connected(UDPC_ConnectionId *list); +uint32_t UDPC_get_protocol_id(UDPC_HContext ctx); + uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id); +UDPC_LoggingType UDPC_get_logging_type(UDPC_HContext ctx); + UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType); +int UPDC_get_receiving_events(UDPC_HContext ctx); + +int UDPC_set_receiving_events(UDPC_HContext ctx, int isReceivingEvents); + +UDPC_Event UDPC_get_event(UDPC_HContext ctx, unsigned long *remaining); + UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned long *remaining); const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId); diff --git a/src/test/UDPC_NetworkTest.cpp b/src/test/UDPC_NetworkTest.cpp index 876cf52..049311a 100644 --- a/src/test/UDPC_NetworkTest.cpp +++ b/src/test/UDPC_NetworkTest.cpp @@ -21,6 +21,7 @@ void usage() { puts("-t "); puts("-n - do not add payload to packets"); puts("-l (silent|error|warning|info|verbose|debug) - log level, default debug"); + puts("-e - enable receiving events"); } int main(int argc, char **argv) { @@ -38,6 +39,7 @@ int main(int argc, char **argv) { unsigned int tickLimit = 15; bool noPayload = false; UDPC_LoggingType logLevel = UDPC_LoggingType::UDPC_DEBUG; + bool isReceivingEvents = false; while(argc > 0) { if(std::strcmp(argv[0], "-c") == 0) { isClient = true; @@ -82,6 +84,9 @@ int main(int argc, char **argv) { usage(); return 1; } + } else if(std::strcmp(argv[0], "-e") == 0) { + isReceivingEvents = true; + puts("Enabled isReceivingEvents"); } else { printf("ERROR: invalid argument \"%s\"\n", argv[0]); usage(); @@ -131,6 +136,7 @@ int main(int argc, char **argv) { return 1; } UDPC_set_logging_type(context, logLevel); + UDPC_set_receiving_events(context, isReceivingEvents ? 1 : 0); unsigned int tick = 0; unsigned int temp = 0; unsigned int temp2, temp3; @@ -138,6 +144,7 @@ int main(int argc, char **argv) { UDPC_ConnectionId *list = nullptr; std::vector sendIds; UDPC_PacketInfo received; + UDPC_Event event; while(true) { std::this_thread::sleep_for(std::chrono::seconds(1)); if(isClient && UDPC_has_connection(context, connectionId) == 0) { @@ -169,6 +176,34 @@ int main(int argc, char **argv) { } } while (size > 0); } + do { + event = UDPC_get_event(context, &size); + if(event.type == UDPC_ET_NONE) { + break; + } + const char *typeString; + switch(event.type) { + case UDPC_ET_CONNECTED: + typeString = "CONNECTED"; + break; + case UDPC_ET_DISCONNECTED: + typeString = "DISCONNECTED"; + break; + case UDPC_ET_GOOD_MODE: + typeString = "GOOD_MODE"; + break; + case UDPC_ET_BAD_MODE: + typeString = "BAD_MODE"; + break; + default: + typeString = "INVALID_TYPE"; + break; + } + printf("Got event %s: %s %u\n", + typeString, + UDPC_atostr(context, event.conId.addr), + event.conId.port); + } while(size > 0); if(tick++ > tickLimit) { break; }