From 69edca40f8a229eb0e51949738415fbd4dcabb4c Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Wed, 20 Feb 2019 15:13:57 +0900 Subject: [PATCH] Impl add to send-queue and recv callbacks, WIP Still not finished, need to set up the thread to send queued packets and receive packets safely. Also need to add to Deque (see TODO comment). --- src/UDPC_Defines.h | 2 + src/UDPC_Deque.h | 2 + src/UDPConnection.c | 190 +++++++++++++++++++++++++++++++++++++------- src/UDPConnection.h | 60 +++++++++++++- 4 files changed, 224 insertions(+), 30 deletions(-) diff --git a/src/UDPC_Defines.h b/src/UDPC_Defines.h index 08de5d2..6bf767e 100644 --- a/src/UDPC_Defines.h +++ b/src/UDPC_Defines.h @@ -52,4 +52,6 @@ static const char *UDPC_ERR_THREADFAIL_STR = "Failed to create thread"; #define UDPC_PACKET_TIMEOUT_SEC 1.0f #define UDPC_GOOD_RTT_LIMIT_SEC 0.25f +#define UDPC_REC_PKTS_ALLOC_SIZE 64 + #endif diff --git a/src/UDPC_Deque.h b/src/UDPC_Deque.h index c3fa935..9434cff 100644 --- a/src/UDPC_Deque.h +++ b/src/UDPC_Deque.h @@ -40,6 +40,8 @@ int UDPC_Deque_push_back(UDPC_Deque *deque, const void *data, uint32_t size); */ int UDPC_Deque_push_front(UDPC_Deque *deque, const void *data, uint32_t size); +// TODO add push_back/push_front variants that realloc on not enough free space + /*! * \return size in bytes of available data */ diff --git a/src/UDPConnection.c b/src/UDPConnection.c index 2e17113..0fa21a3 100644 --- a/src/UDPConnection.c +++ b/src/UDPConnection.c @@ -9,10 +9,23 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient) { UDPC_Context *context = malloc(sizeof(UDPC_Context)); context->error = UDPC_SUCCESS; - context->flags = 0; - context->threadFlags = 0; - context->atostrBuf[UDPC_ATOSTR_BUF_SIZE - 1] = 0; + context->flags = 0xC; if(isClient != 0) context->flags |= 0x2; + context->threadFlags = 0; + context->conMap = UDPC_HashMap_init(13, sizeof(UDPC_INTERNAL_ConnectionData)); + timespec_get(&context->lastUpdated, TIME_UTC); + context->atostrBuf[UDPC_ATOSTR_BUF_SIZE - 1] = 0; + context->receivedPackets = UDPC_Deque_init( + UDPC_REC_PKTS_ALLOC_SIZE * sizeof(UDPC_INTERNAL_PacketInfo)); + context->callbackConnected = NULL; + context->callbackConnectedUserData = NULL; + context->callbackDisconnected = NULL; + context->callbackDisconnectedUserData = NULL; + context->callbackReceived = NULL; + context->callbackReceivedUserData = NULL; + + // seed rand + srand(context->lastUpdated.tv_sec); // create socket context->socketHandle = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); @@ -20,7 +33,7 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient) { context->socketHandle = 0; context->error = UDPC_ERR_SOCKETFAIL; - fprintf(stderr, "Failed to create socket\n"); + UDPC_INTERNAL_log(context, 0, "Failed to create socket"); return context; } @@ -37,7 +50,7 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient) context->error = UDPC_ERR_SOCKETBINDF; CleanupSocket(context->socketHandle); context->socketHandle = 0; - fprintf(stderr, "Failed to bind socket\n"); + UDPC_INTERNAL_log(context, 0, "Failed to bind socket"); return context; } @@ -56,19 +69,13 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient) context->error = UDPC_ERR_SOCKETNONBF; CleanupSocket(context->socketHandle); context->socketHandle = 0; - fprintf(stderr, "Failed to set non-blocking on socket\n"); + UDPC_INTERNAL_log(context, 0, "Failed to set non-blocking on socket"); #if UDPC_PLATFORM == UDPC_PLATFORM_UNKNOWN - fprintf(stderr, "(Unknown platform)\n"); + UDPC_INTERNAL_log(context, 0, "(Unknown platform)"); #endif return context; } - context->conMap = UDPC_HashMap_init(13, sizeof(UDPC_INTERNAL_ConnectionData)); - - timespec_get(&context->lastUpdated, TIME_UTC); - - context->flags |= (0x8 | 0x4); - return context; } @@ -81,8 +88,9 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient) { CleanupSocket(context->socketHandle); context->socketHandle = 0; - fprintf(stderr, "Failed to create mutex\n"); + UDPC_INTERNAL_log(context, 0, "Failed to create mutex"); context->error = UDPC_ERR_MTXFAIL; + return context; } context->error = UDPC_SUCCESS; @@ -92,7 +100,7 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient) CleanupSocket(context->socketHandle); context->socketHandle = 0; mtx_destroy(&context->tCVMtx); - fprintf(stderr, "Failed to create mutex\n"); + UDPC_INTERNAL_log(context, 0, "Failed to create mutex"); context->error = UDPC_ERR_MTXFAIL; return context; } @@ -105,7 +113,7 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient) context->socketHandle = 0; mtx_destroy(&context->tCVMtx); mtx_destroy(&context->tflagsMtx); - fprintf(stderr, "Failed to create condition variable\n"); + UDPC_INTERNAL_log(context, 0, "Failed to create condition variable"); context->error = UDPC_ERR_CVFAIL; return context; } @@ -120,7 +128,7 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient) mtx_destroy(&context->tCVMtx); mtx_destroy(&context->tflagsMtx); cnd_destroy(&context->threadCV); - fprintf(stderr, "Failed to create thread\n"); + UDPC_INTERNAL_log(context, 0, "Failed to create thread"); context->error = UDPC_ERR_THREADFAIL; return context; } @@ -177,6 +185,87 @@ void UDPC_INTERNAL_destroy_conMap(void *unused, uint32_t addr, char *data) UDPC_Deque_destroy(cd->sendPktQueue); } +void UDPC_set_callback_connected( + UDPC_Context *ctx, UDPC_callback_connected fptr, void *userData) +{ + ctx->callbackConnected = fptr; + ctx->callbackConnectedUserData = userData; +} + +void UDPC_set_callback_disconnected( + UDPC_Context *ctx, UDPC_callback_disconnected fptr, void *userData) +{ + ctx->callbackDisconnected = fptr; + ctx->callbackDisconnectedUserData = userData; +} + +void UDPC_set_callback_received( + UDPC_Context *ctx, UDPC_callback_received fptr, void *userData) +{ + ctx->callbackReceived = fptr; + ctx->callbackReceivedUserData = userData; +} + +void UDPC_check_received(UDPC_Context *ctx) +{ + // TODO use a lock on receivedPackets + if(ctx->callbackReceived) + { + for(int x = 0; x * sizeof(UDPC_INTERNAL_PacketInfo) < ctx->receivedPackets->size; ++x) + { + UDPC_INTERNAL_PacketInfo *pinfo = UDPC_Deque_index_ptr( + ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo), x); + ctx->callbackReceived(ctx->callbackReceivedUserData, pinfo->data, pinfo->size); + free(pinfo->data); + } + UDPC_Deque_clear(ctx->receivedPackets); + } + else + { + UDPC_INTERNAL_log(ctx, 0, "Received callback not set"); + for(int x = 0; x * sizeof(UDPC_INTERNAL_PacketInfo) < ctx->receivedPackets->size; ++x) + { + UDPC_INTERNAL_PacketInfo *pinfo = UDPC_Deque_index_ptr( + ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo), x); + free(pinfo->data); + } + UDPC_Deque_clear(ctx->receivedPackets); + } +} + +int UDPC_queue_send(UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *data, uint32_t size) +{ + UDPC_INTERNAL_ConnectionData *cd = UDPC_HashMap_get(ctx->conMap, addr); + if(cd) + { + UDPC_INTERNAL_PacketInfo pinfo = { + addr, + 0, + isChecked != 0 ? 0x2 : 0, + malloc(size), + size, + {0, 0} + }; + if(pinfo.data) + { + memcpy(pinfo.data, data, size); + UDPC_Deque_push_back(cd->sendPktQueue, &pinfo, sizeof(UDPC_INTERNAL_PacketInfo)); + return 1; + } + else + { + UDPC_INTERNAL_log(ctx, 0, "Failed to allocate memory to new send-packet queue entry"); + return 0; + } + } + else + { + UDPC_INTERNAL_log(ctx, 0, "Cannot send to %s when connection has not been esablished", + UDPC_INTERNAL_atostr(ctx, addr)); + return 0; + } +} + uint32_t UDPC_get_error(UDPC_Context *ctx) { uint32_t error = ctx->error; @@ -233,7 +322,7 @@ void UDPC_update(UDPC_Context *ctx) ctx }; timespec_get(&us.tsNow, TIME_UTC); - us.dt = UDPC_ts_diff_to_seconds(&us.tsNow, &ctx->lastUpdated); + us.dt = UDPC_INTERNAL_ts_diff(&us.tsNow, &ctx->lastUpdated); ctx->lastUpdated = us.tsNow; us.removedQueue = UDPC_Deque_init(4 * (ctx->conMap->size)); @@ -301,7 +390,7 @@ void UDPC_update(UDPC_Context *ctx) ntohs(receivedData.sin_port)); UDPC_INTERNAL_ConnectionData newCD = { 1, - 0, + (ctx->flags & 0x2) != 0 ? conID : UDPC_INTERNAL_generate_id(ctx), 0, 0, 0xFFFFFFFF, @@ -317,9 +406,13 @@ void UDPC_update(UDPC_Context *ctx) {0, 0}, 0.0f }; - timespec_get(&newCD.received, TIME_UTC); - timespec_get(&newCD.sent, TIME_UTC); + newCD.received = us.tsNow; + newCD.sent = us.tsNow; UDPC_HashMap_insert(ctx->conMap, newCD.addr, &newCD); + if(ctx->callbackConnected) + { + ctx->callbackConnected(ctx->callbackConnectedUserData, receivedData.sin_addr.s_addr); + } } return; } @@ -418,7 +511,23 @@ void UDPC_update(UDPC_Context *ctx) UDPC_INTERNAL_atostr(ctx, cd->addr)); } - // TODO received packet callback here + if(bytes > 20) + { + UDPC_INTERNAL_PacketInfo receivedInfo; + receivedInfo.addr = receivedData.sin_addr.s_addr; + receivedInfo.id = conID; + receivedInfo.flags = (isNotRecvCheck != 0 ? 0 : 0x2) | (isResent != 0 ? 0x5 : 0); + receivedInfo.data = malloc(bytes - 20); + memcpy(receivedInfo.data, ctx->recvBuf + 20, bytes - 20); + receivedInfo.size = bytes - 20; + receivedInfo.sent = us.tsNow; + + UDPC_Deque_push_back(ctx->receivedPackets, &receivedInfo, sizeof(UDPC_INTERNAL_PacketInfo)); + } + else + { + UDPC_INTERNAL_log(ctx, 3, "Packet has no payload, not adding to received queue"); + } } void UDPC_INTERNAL_update_to_rtt_si(void *userData, uint32_t addr, char *data) @@ -428,12 +537,16 @@ void UDPC_INTERNAL_update_to_rtt_si(void *userData, uint32_t addr, char *data) UDPC_INTERNAL_ConnectionData *cd = (UDPC_INTERNAL_ConnectionData*)data; // check for timed out connection - if(UDPC_ts_diff_to_seconds(&us->tsNow, &cd->received) >= UDPC_TIMEOUT_SECONDS) + if(UDPC_INTERNAL_ts_diff(&us->tsNow, &cd->received) >= UDPC_TIMEOUT_SECONDS) { UDPC_Deque_push_back(us->removedQueue, &addr, 4); UDPC_INTERNAL_log(us->ctx, 2, "Connection timed out with addr %s port %d", UDPC_INTERNAL_atostr(us->ctx, addr), cd->port); + if(us->ctx->callbackDisconnected) + { + us->ctx->callbackDisconnected(us->ctx->callbackDisconnectedUserData, addr); + } return; } @@ -513,7 +626,7 @@ void UDPC_INTERNAL_update_send(void *userData, uint32_t addr, char *data) if(cd->sendPktQueue->size == 0) { // send packet queue is empty, send heartbeat packet - if(UDPC_ts_diff_to_seconds(&us->tsNow, &cd->sent) < UDPC_HEARTBEAT_PKT_INTERVAL) + if(UDPC_INTERNAL_ts_diff(&us->tsNow, &cd->sent) < UDPC_HEARTBEAT_PKT_INTERVAL) { return; } @@ -653,7 +766,7 @@ void UDPC_INTERNAL_update_rtt( UDPC_INTERNAL_PacketInfo *pinfo = UDPC_Deque_index_ptr(cd->sentPkts, sizeof(UDPC_INTERNAL_PacketInfo), x); if(pinfo->id == rseq) { - float diff = UDPC_ts_diff_to_seconds(tsNow, &pinfo->sent); + float diff = UDPC_INTERNAL_ts_diff(tsNow, &pinfo->sent); if(diff > cd->rtt) { cd->rtt += (diff - cd->rtt) / 10.0f; @@ -705,7 +818,7 @@ void UDPC_INTERNAL_check_pkt_timeout( // is not received checked or already resent break; } - float seconds = UDPC_ts_diff_to_seconds(tsNow, &pinfo->sent); + float seconds = UDPC_INTERNAL_ts_diff(tsNow, &pinfo->sent); if(seconds >= UDPC_PACKET_TIMEOUT_SEC) { if(pinfo->size <= 20) @@ -747,7 +860,7 @@ void UDPC_INTERNAL_check_pkt_timeout( } } -float UDPC_ts_diff_to_seconds(struct timespec *ts0, struct timespec *ts1) +float UDPC_INTERNAL_ts_diff(struct timespec *ts0, struct timespec *ts1) { float sec = 0.0f; if(!ts0 || !ts1) @@ -814,6 +927,7 @@ int UDPC_INTERNAL_threadfn(void *context) } mtx_lock(&ctx->tCVMtx); cnd_timedwait(&ctx->threadCV, &ctx->tCVMtx, &ts); + UDPC_update(ctx); mtx_unlock(&ctx->tCVMtx); mtx_lock(&ctx->tflagsMtx); @@ -916,3 +1030,25 @@ char* UDPC_INTERNAL_atostr(UDPC_Context *ctx, uint32_t addr) return ctx->atostrBuf; } + +uint32_t UDPC_INTERNAL_generate_id(UDPC_Context *ctx) +{ + uint32_t newID = 0x10000000; + + while(newID == 0x10000000) + { + newID = rand() % 0x10000000; + UDPC_HashMap_itercall(ctx->conMap, UDPC_INTERNAL_check_ids, &newID); + } + + return newID; +} + +void UDPC_INTERNAL_check_ids(void *userData, uint32_t addr, char *data) +{ + UDPC_INTERNAL_ConnectionData *cd = (UDPC_INTERNAL_ConnectionData*)data; + if(cd->id == *((uint32_t*)userData)) + { + *((uint32_t*)userData) = 0x10000000; + } +} diff --git a/src/UDPConnection.h b/src/UDPConnection.h index a1ab794..d1290b2 100644 --- a/src/UDPConnection.h +++ b/src/UDPConnection.h @@ -27,6 +27,27 @@ #define UDPC_ATOSTR_BUF_SIZE 16 +/// (void *userData, uint32_t address) +/*! + * Note address is in network byte order (usually big-endian) + */ +typedef void (*UDPC_callback_connected)(void*, uint32_t); + +/// (void *userData, uint32_t address) +/*! + * Note address is in network byte order (usually big-endian) + */ +typedef void (*UDPC_callback_disconnected)(void*, uint32_t); + +/// (void *userData, char *packetData, uint32_t packetSize) +/*! + * The data pointed to by the packetData argument is to data internally managed + * by the UDPC_Context. It will change every time this callback is called so do + * not depend on it persisting. This means you should copy the data out of it + * when the callback is invoked and work with the copied data. + */ +typedef void (*UDPC_callback_received)(void*, char*, uint32_t); + /// This struct should not be used outside of this library typedef struct { uint32_t addr; // in network order (big-endian) @@ -37,7 +58,7 @@ typedef struct { * 0x4 - has been re-sent */ uint32_t flags; - char *data; // no-header in sendPktQueue, header in sentPkts + char *data; // no-header in sendPktQueue and receivedPackets, header in sentPkts uint32_t size; struct timespec sent; } UDPC_INTERNAL_PacketInfo; @@ -94,6 +115,14 @@ typedef struct { struct timespec lastUpdated; char atostrBuf[UDPC_ATOSTR_BUF_SIZE]; char recvBuf[UDPC_PACKET_MAX_SIZE]; + UDPC_Deque *receivedPackets; + + UDPC_callback_connected callbackConnected; + void *callbackConnectedUserData; + UDPC_callback_disconnected callbackDisconnected; + void *callbackDisconnectedUserData; + UDPC_callback_received callbackReceived; + void *callbackReceivedUserData; } UDPC_Context; typedef struct { @@ -111,6 +140,27 @@ void UDPC_destroy(UDPC_Context *ctx); void UDPC_INTERNAL_destroy_conMap(void *unused, uint32_t addr, char *data); +void UDPC_set_callback_connected( + UDPC_Context *ctx, UDPC_callback_connected fptr, void *userData); + +void UDPC_set_callback_disconnected( + UDPC_Context *ctx, UDPC_callback_disconnected fptr, void *userData); + +void UDPC_set_callback_received( + UDPC_Context *ctx, UDPC_callback_received fptr, void *userData); + +void UDPC_check_received(UDPC_Context *ctx); + +/*! + * \brief Queues a packet to send to a connected peer + * Note addr is expected to be in network-byte-order (big-endian). + * If isChecked is non-zero, UDPC will attempt to resend the packet if peer has + * not received it within UDPC_PACKET_TIMEOUT_SEC seconds. + * \return non-zero on success + */ +int UDPC_queue_send( + UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *data, uint32_t size); + uint32_t UDPC_get_error(UDPC_Context *ctx); const char* UDPC_get_error_str(uint32_t error); @@ -146,9 +196,9 @@ void UDPC_INTERNAL_check_pkt_timeout( uint32_t ack, struct timespec *tsNow); -float UDPC_ts_diff_to_seconds(struct timespec *ts0, struct timespec *ts1); +float UDPC_INTERNAL_ts_diff(struct timespec *ts0, struct timespec *ts1); -int UDPC_INTERNAL_threadfn(void *context); // internal usage only +int UDPC_INTERNAL_threadfn(void *context); /* * 0x1 - is ping @@ -173,4 +223,8 @@ void UDPC_INTERNAL_log(UDPC_Context *ctx, uint32_t level, const char *msg, ...); char* UDPC_INTERNAL_atostr(UDPC_Context *ctx, uint32_t addr); +uint32_t UDPC_INTERNAL_generate_id(UDPC_Context *ctx); + +void UDPC_INTERNAL_check_ids(void *userData, uint32_t addr, char *data); + #endif