Add events, refactorings

Added event system to lessen the use of the main mutex and instead use
thread safe data structures (TSLQueue). Also can enable and check events
during execution (connect, disconnect, good mode, bad mode).

Fixes and refactorings.
This commit is contained in:
Stephen Seo 2019-11-11 16:08:51 +09:00
parent d86b7e4e1d
commit 7b5cf3b6f8
4 changed files with 269 additions and 89 deletions

View file

@ -97,7 +97,7 @@ struct ConnectionData {
* 4 - is id set
* 5 - error initializing keys for public key encryption
*/
std::bitset<32> flags;
std::bitset<8> flags;
uint32_t id;
uint32_t lseq;
uint32_t rseq;
@ -183,8 +183,9 @@ public:
* 0 - is threaded
* 1 - is client
*/
std::bitset<32> flags;
std::bitset<8> flags;
std::atomic_bool isAcceptNewConnections;
std::atomic_bool isReceivingEvents;
std::atomic_uint32_t protocolID;
std::atomic_uint_fast8_t loggingType;
std::atomic_uint32_t atostrBufIndex;
@ -202,6 +203,10 @@ public:
std::unordered_map<uint32_t, UDPC_ConnectionId> idMap;
TSLQueue<UDPC_PacketInfo> receivedPkts;
TSLQueue<UDPC_PacketInfo> cSendPkts;
// handled internally
TSLQueue<UDPC_Event> internalEvents;
// handled via interface, if isReceivingEvents is true
TSLQueue<UDPC_Event> externalEvents;
std::default_random_engine rng_engine;

View file

@ -163,6 +163,7 @@ UDPC::Context::Context(bool isThreaded) :
_contextIdentifier(UDPC_CONTEXT_IDENTIFIER),
flags(),
isAcceptNewConnections(true),
isReceivingEvents(false),
protocolID(UDPC_DEFAULT_PROTOCOL_ID),
#ifndef NDEBUG
loggingType(UDPC_DEBUG),
@ -225,6 +226,113 @@ void UDPC::Context::update_impl() {
std::chrono::steady_clock::duration temp_dt_fs;
lastUpdated = now;
// handle internalEvents
do {
auto optE = internalEvents.top_and_pop();
if(optE.has_value()) {
switch(optE.value().type) {
case UDPC_ET_REQUEST_CONNECT:
{
UDPC::ConnectionData newCon(
false,
this,
optE.value().conId.addr,
optE.value().conId.scope_id,
optE.value().conId.port);
if(newCon.flags.test(5)) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR,
"Failed to init ConnectionData instance (libsodium "
"init fail) while client establishing connection with ",
UDPC_atostr((UDPC_HContext)this, optE.value().conId.addr),
" port ",
optE.value().conId.port);
continue;
}
newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT;
if(conMap.find(optE.value().conId) == conMap.end()) {
conMap.insert(std::make_pair(
optE.value().conId,
std::move(newCon)));
auto addrConIter = addrConMap.find(optE.value().conId.addr);
if(addrConIter == addrConMap.end()) {
auto insertResult = addrConMap.insert(std::make_pair(
optE.value().conId.addr,
std::unordered_set<UDPC_ConnectionId, UDPC::ConnectionIdHasher>{}));
assert(insertResult.second &&
"new connection insert into addrConMap must not fail");
addrConIter = insertResult.first;
}
addrConIter->second.insert(optE.value().conId);
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_INFO,
"Client initiating connection to ",
UDPC_atostr((UDPC_HContext)this, optE.value().conId.addr),
" port ",
optE.value().conId.port,
" ...");
} else {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_WARNING,
"Client initiate connection, already connected to peer ",
UDPC_atostr((UDPC_HContext)this, optE.value().conId.addr),
" port ",
optE.value().conId.port);
}
}
break;
case UDPC_ET_REQUEST_DISCONNECT:
if(optE.value().dropAllWithAddr != 0) {
// drop all connections with same address
auto addrConIter = addrConMap.find(optE.value().conId.addr);
if(addrConIter != addrConMap.end()) {
for(auto identIter = addrConIter->second.begin();
identIter != addrConIter->second.end();
++identIter) {
auto conIter = conMap.find(*identIter);
assert(conIter != conMap.end() &&
"conMap must have connection listed in addrConMap");
if(conIter->second.flags.test(4)) {
idMap.erase(conIter->second.id);
}
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
UDPC_ET_DISCONNECTED, conIter->first, false});
}
conMap.erase(conIter);
}
addrConMap.erase(addrConIter);
}
} else {
// drop only specific connection with addr and port
auto iter = conMap.find(optE.value().conId);
if(iter != conMap.end()) {
if(iter->second.flags.test(4)) {
idMap.erase(iter->second.id);
}
auto addrConIter = addrConMap.find(optE.value().conId.addr);
if(addrConIter != addrConMap.end()) {
addrConIter->second.erase(optE.value().conId);
if(addrConIter->second.empty()) {
addrConMap.erase(addrConIter);
}
}
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
UDPC_ET_DISCONNECTED, iter->first, false});
}
conMap.erase(iter);
}
}
break;
default:
assert(!"internalEvents got invalid type");
break;
}
}
} while(!internalEvents.empty());
{
// check timed out, check good/bad mode with rtt, remove timed out
std::vector<UDPC_ConnectionId> removed;
@ -257,6 +365,10 @@ void UDPC::Context::update_impl() {
iter->second.toggleT *= 2;
}
iter->second.toggledTimer = std::chrono::steady_clock::duration::zero();
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
UDPC_ET_BAD_MODE, iter->first, false});
}
} else if(iter->second.flags.test(1)) {
// good mode, good rtt
if(iter->second.toggleTimer >= UDPC::TEN_SECONDS) {
@ -279,6 +391,10 @@ void UDPC::Context::update_impl() {
", port = ",
iter->second.port);
iter->second.flags.set(1);
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
UDPC_ET_GOOD_MODE, iter->first, false});
}
}
} else {
// bad mode, bad rtt
@ -317,6 +433,10 @@ void UDPC::Context::update_impl() {
if(cIter->second.flags.test(4)) {
idMap.erase(cIter->second.id);
}
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
UDPC_ET_DISCONNECTED, *iter, false});
}
conMap.erase(cIter);
}
@ -325,6 +445,7 @@ void UDPC::Context::update_impl() {
// move queued in cSendPkts to existing connection's sendPkts
{
auto sendIter = cSendPkts.begin();
std::unordered_set<UDPC_ConnectionId, UDPC::ConnectionIdHasher> dropped;
while(true) {
auto next = sendIter.current();
if(next) {
@ -352,15 +473,18 @@ void UDPC::Context::update_impl() {
break;
}
} else {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_WARNING,
"Dropped queued packet to ",
UDPC_atostr(
(UDPC_HContext)this,
next.value().receiver.addr),
", port = ",
next.value().receiver.port,
" due to connection not existing");
if(dropped.find(next.value().receiver) == dropped.end()) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_WARNING,
"Dropped queued packets to ",
UDPC_atostr(
(UDPC_HContext)this,
next.value().receiver.addr),
", port = ",
next.value().receiver.port,
" due to connection not existing");
dropped.insert(next.value().receiver);
}
if(sendIter.remove()) {
continue;
} else {
@ -749,7 +873,12 @@ void UDPC::Context::update_impl() {
addrConIter = insertResult.first;
}
addrConIter->second.insert(identifier);
// TODO trigger event server established connection with client
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
UDPC_ET_CONNECTED,
identifier,
false});
}
} else if (flags.test(1)) {
// is client
auto iter = conMap.find(identifier);
@ -768,7 +897,12 @@ void UDPC::Context::update_impl() {
", port = ",
ntohs(receivedData.sin6_port),
", got id = ", conID);
// TODO trigger event client established connection with server
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
UDPC_ET_CONNECTED,
identifier,
false});
}
}
return;
}
@ -1227,41 +1361,7 @@ void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connec
return;
}
UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "client_initiate_connection: Got peer a = ",
UDPC_atostr((UDPC_HContext)ctx, connectionId.addr),
", p = ", connectionId.port);
std::lock_guard<std::mutex> lock(c->mutex);
UDPC::ConnectionData newCon(false, c, connectionId.addr, connectionId.scope_id, connectionId.port);
if(newCon.flags.test(5)) {
UDPC_CHECK_LOG(c,
UDPC_LoggingType::UDPC_ERROR,
"Failed to init ConnectionData instance (libsodium init "
"fail) while client establishing connection with ",
UDPC_atostr((UDPC_HContext)c, connectionId.addr),
", port = ",
connectionId.port);
return;
}
newCon.sent = std::chrono::steady_clock::now() - UDPC::INIT_PKT_INTERVAL_DT;
if(c->conMap.find(connectionId) == c->conMap.end()) {
c->conMap.insert(std::make_pair(connectionId, std::move(newCon)));
auto addrConIter = c->addrConMap.find(connectionId.addr);
if(addrConIter == c->addrConMap.end()) {
auto insertResult = c->addrConMap.insert(std::make_pair(
connectionId.addr,
std::unordered_set<UDPC_ConnectionId, UDPC::ConnectionIdHasher>{}
));
assert(insertResult.second);
addrConIter = insertResult.first;
}
addrConIter->second.insert(connectionId);
UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_INFO, "client_initiate_connection: Initiating connection...");
} else {
UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_ERROR, "client_initiate_connection: Already connected to peer");
}
c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_CONNECT, connectionId, false});
}
void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
@ -1304,49 +1404,14 @@ int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting) {
return c->isAcceptNewConnections.exchange(isAccepting == 0 ? false : true);
}
int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool dropAllWithAddr) {
void UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, int dropAllWithAddr) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return 0;
return;
}
std::lock_guard<std::mutex> lock(c->mutex);
if(dropAllWithAddr) {
auto addrConIter = c->addrConMap.find(connectionId.addr);
if(addrConIter != c->addrConMap.end()) {
for(auto identIter = addrConIter->second.begin();
identIter != addrConIter->second.end();
++identIter) {
auto conIter = c->conMap.find(*identIter);
assert(conIter != c->conMap.end());
if(conIter->second.flags.test(4)) {
c->idMap.erase(conIter->second.id);
}
c->conMap.erase(conIter);
}
c->addrConMap.erase(addrConIter);
return 1;
}
} else {
auto iter = c->conMap.find(connectionId);
if(iter != c->conMap.end()) {
if(iter->second.flags.test(4)) {
c->idMap.erase(iter->second.id);
}
auto addrConIter = c->addrConMap.find(connectionId.addr);
if(addrConIter != c->addrConMap.end()) {
addrConIter->second.erase(connectionId);
if(addrConIter->second.empty()) {
c->addrConMap.erase(addrConIter);
}
}
c->conMap.erase(iter);
return 1;
}
}
return 0;
c->internalEvents.push(UDPC_Event{UDPC_ET_REQUEST_DISCONNECT, connectionId, dropAllWithAddr});
return;
}
int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
@ -1390,15 +1455,32 @@ void UDPC_free_list_connected(UDPC_ConnectionId *list) {
std::free(list);
}
uint32_t UDPC_get_protocol_id(UDPC_HContext ctx) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return 0;
}
return c->protocolID.load();
}
uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return 0;
}
std::lock_guard<std::mutex> lock(c->mutex);
return c->protocolID.exchange(id);
}
UDPC_LoggingType UDPC_get_logging_type(UDPC_HContext ctx) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return UDPC_LoggingType::UDPC_SILENT;
}
return static_cast<UDPC_LoggingType>(c->loggingType.load());
}
UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
@ -1407,6 +1489,38 @@ UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggi
return static_cast<UDPC_LoggingType>(c->loggingType.exchange(loggingType));
}
int UDPC_get_receiving_events(UDPC_HContext ctx) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return 0;
}
return c->isReceivingEvents.load() ? 1 : 0;
}
int UDPC_set_receiving_events(UDPC_HContext ctx, int isReceivingEvents) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return 0;
}
return c->isReceivingEvents.exchange(isReceivingEvents != 0);
}
UDPC_Event UDPC_get_event(UDPC_HContext ctx, unsigned long *remaining) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0};
}
auto optE = c->externalEvents.top_and_pop_and_rsize(remaining);
if(optE) {
return optE.value();
} else {
return UDPC_Event{UDPC_ET_NONE, UDPC_create_id_anyaddr(0), 0};
}
}
UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned long *remaining) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {

View file

@ -88,6 +88,22 @@ typedef struct {
UDPC_ConnectionId receiver;
} UDPC_PacketInfo;
typedef enum {
UDPC_ET_NONE,
UDPC_ET_REQUEST_CONNECT,
UDPC_ET_REQUEST_DISCONNECT,
UDPC_ET_CONNECTED,
UDPC_ET_DISCONNECTED,
UDPC_ET_GOOD_MODE,
UDPC_ET_BAD_MODE
} UDPC_EventType;
typedef struct {
UDPC_EventType type;
UDPC_ConnectionId conId;
int dropAllWithAddr;
} UDPC_Event;
/// port should be in native byte order (not network/big-endian)
UDPC_ConnectionId UDPC_create_id(UDPC_IPV6_ADDR_TYPE addr, uint16_t port);
@ -112,7 +128,7 @@ unsigned long UDPC_get_queue_send_current_size(UDPC_HContext ctx);
int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting);
int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool dropAllWithAddr);
void UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, int dropAllWithAddr);
int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
@ -120,10 +136,20 @@ UDPC_ConnectionId* UDPC_get_list_connected(UDPC_HContext ctx, unsigned int *size
void UDPC_free_list_connected(UDPC_ConnectionId *list);
uint32_t UDPC_get_protocol_id(UDPC_HContext ctx);
uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id);
UDPC_LoggingType UDPC_get_logging_type(UDPC_HContext ctx);
UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType);
int UPDC_get_receiving_events(UDPC_HContext ctx);
int UDPC_set_receiving_events(UDPC_HContext ctx, int isReceivingEvents);
UDPC_Event UDPC_get_event(UDPC_HContext ctx, unsigned long *remaining);
UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned long *remaining);
const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId);

View file

@ -21,6 +21,7 @@ void usage() {
puts("-t <tick_count>");
puts("-n - do not add payload to packets");
puts("-l (silent|error|warning|info|verbose|debug) - log level, default debug");
puts("-e - enable receiving events");
}
int main(int argc, char **argv) {
@ -38,6 +39,7 @@ int main(int argc, char **argv) {
unsigned int tickLimit = 15;
bool noPayload = false;
UDPC_LoggingType logLevel = UDPC_LoggingType::UDPC_DEBUG;
bool isReceivingEvents = false;
while(argc > 0) {
if(std::strcmp(argv[0], "-c") == 0) {
isClient = true;
@ -82,6 +84,9 @@ int main(int argc, char **argv) {
usage();
return 1;
}
} else if(std::strcmp(argv[0], "-e") == 0) {
isReceivingEvents = true;
puts("Enabled isReceivingEvents");
} else {
printf("ERROR: invalid argument \"%s\"\n", argv[0]);
usage();
@ -131,6 +136,7 @@ int main(int argc, char **argv) {
return 1;
}
UDPC_set_logging_type(context, logLevel);
UDPC_set_receiving_events(context, isReceivingEvents ? 1 : 0);
unsigned int tick = 0;
unsigned int temp = 0;
unsigned int temp2, temp3;
@ -138,6 +144,7 @@ int main(int argc, char **argv) {
UDPC_ConnectionId *list = nullptr;
std::vector<unsigned int> sendIds;
UDPC_PacketInfo received;
UDPC_Event event;
while(true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
if(isClient && UDPC_has_connection(context, connectionId) == 0) {
@ -169,6 +176,34 @@ int main(int argc, char **argv) {
}
} while (size > 0);
}
do {
event = UDPC_get_event(context, &size);
if(event.type == UDPC_ET_NONE) {
break;
}
const char *typeString;
switch(event.type) {
case UDPC_ET_CONNECTED:
typeString = "CONNECTED";
break;
case UDPC_ET_DISCONNECTED:
typeString = "DISCONNECTED";
break;
case UDPC_ET_GOOD_MODE:
typeString = "GOOD_MODE";
break;
case UDPC_ET_BAD_MODE:
typeString = "BAD_MODE";
break;
default:
typeString = "INVALID_TYPE";
break;
}
printf("Got event %s: %s %u\n",
typeString,
UDPC_atostr(context, event.conId.addr),
event.conId.port);
} while(size > 0);
if(tick++ > tickLimit) {
break;
}