diff --git a/src/UDPC_Defines.h b/src/UDPC_Defines.h index 08de5d2..6bf767e 100644 --- a/src/UDPC_Defines.h +++ b/src/UDPC_Defines.h @@ -52,4 +52,6 @@ static const char *UDPC_ERR_THREADFAIL_STR = "Failed to create thread"; #define UDPC_PACKET_TIMEOUT_SEC 1.0f #define UDPC_GOOD_RTT_LIMIT_SEC 0.25f +#define UDPC_REC_PKTS_ALLOC_SIZE 64 + #endif diff --git a/src/UDPC_Deque.h b/src/UDPC_Deque.h index c3fa935..9434cff 100644 --- a/src/UDPC_Deque.h +++ b/src/UDPC_Deque.h @@ -40,6 +40,8 @@ int UDPC_Deque_push_back(UDPC_Deque *deque, const void *data, uint32_t size); */ int UDPC_Deque_push_front(UDPC_Deque *deque, const void *data, uint32_t size); +// TODO add push_back/push_front variants that realloc on not enough free space + /*! * \return size in bytes of available data */ diff --git a/src/UDPConnection.c b/src/UDPConnection.c index 2e17113..0fa21a3 100644 --- a/src/UDPConnection.c +++ b/src/UDPConnection.c @@ -9,10 +9,23 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient) { UDPC_Context *context = malloc(sizeof(UDPC_Context)); context->error = UDPC_SUCCESS; - context->flags = 0; - context->threadFlags = 0; - context->atostrBuf[UDPC_ATOSTR_BUF_SIZE - 1] = 0; + context->flags = 0xC; if(isClient != 0) context->flags |= 0x2; + context->threadFlags = 0; + context->conMap = UDPC_HashMap_init(13, sizeof(UDPC_INTERNAL_ConnectionData)); + timespec_get(&context->lastUpdated, TIME_UTC); + context->atostrBuf[UDPC_ATOSTR_BUF_SIZE - 1] = 0; + context->receivedPackets = UDPC_Deque_init( + UDPC_REC_PKTS_ALLOC_SIZE * sizeof(UDPC_INTERNAL_PacketInfo)); + context->callbackConnected = NULL; + context->callbackConnectedUserData = NULL; + context->callbackDisconnected = NULL; + context->callbackDisconnectedUserData = NULL; + context->callbackReceived = NULL; + context->callbackReceivedUserData = NULL; + + // seed rand + srand(context->lastUpdated.tv_sec); // create socket context->socketHandle = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); @@ -20,7 +33,7 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient) { context->socketHandle = 0; context->error = UDPC_ERR_SOCKETFAIL; - fprintf(stderr, "Failed to create socket\n"); + UDPC_INTERNAL_log(context, 0, "Failed to create socket"); return context; } @@ -37,7 +50,7 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient) context->error = UDPC_ERR_SOCKETBINDF; CleanupSocket(context->socketHandle); context->socketHandle = 0; - fprintf(stderr, "Failed to bind socket\n"); + UDPC_INTERNAL_log(context, 0, "Failed to bind socket"); return context; } @@ -56,19 +69,13 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient) context->error = UDPC_ERR_SOCKETNONBF; CleanupSocket(context->socketHandle); context->socketHandle = 0; - fprintf(stderr, "Failed to set non-blocking on socket\n"); + UDPC_INTERNAL_log(context, 0, "Failed to set non-blocking on socket"); #if UDPC_PLATFORM == UDPC_PLATFORM_UNKNOWN - fprintf(stderr, "(Unknown platform)\n"); + UDPC_INTERNAL_log(context, 0, "(Unknown platform)"); #endif return context; } - context->conMap = UDPC_HashMap_init(13, sizeof(UDPC_INTERNAL_ConnectionData)); - - timespec_get(&context->lastUpdated, TIME_UTC); - - context->flags |= (0x8 | 0x4); - return context; } @@ -81,8 +88,9 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient) { CleanupSocket(context->socketHandle); context->socketHandle = 0; - fprintf(stderr, "Failed to create mutex\n"); + UDPC_INTERNAL_log(context, 0, "Failed to create mutex"); context->error = UDPC_ERR_MTXFAIL; + return context; } context->error = UDPC_SUCCESS; @@ -92,7 +100,7 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient) CleanupSocket(context->socketHandle); context->socketHandle = 0; mtx_destroy(&context->tCVMtx); - fprintf(stderr, "Failed to create mutex\n"); + UDPC_INTERNAL_log(context, 0, "Failed to create mutex"); context->error = UDPC_ERR_MTXFAIL; return context; } @@ -105,7 +113,7 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient) context->socketHandle = 0; mtx_destroy(&context->tCVMtx); mtx_destroy(&context->tflagsMtx); - fprintf(stderr, "Failed to create condition variable\n"); + UDPC_INTERNAL_log(context, 0, "Failed to create condition variable"); context->error = UDPC_ERR_CVFAIL; return context; } @@ -120,7 +128,7 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient) mtx_destroy(&context->tCVMtx); mtx_destroy(&context->tflagsMtx); cnd_destroy(&context->threadCV); - fprintf(stderr, "Failed to create thread\n"); + UDPC_INTERNAL_log(context, 0, "Failed to create thread"); context->error = UDPC_ERR_THREADFAIL; return context; } @@ -177,6 +185,87 @@ void UDPC_INTERNAL_destroy_conMap(void *unused, uint32_t addr, char *data) UDPC_Deque_destroy(cd->sendPktQueue); } +void UDPC_set_callback_connected( + UDPC_Context *ctx, UDPC_callback_connected fptr, void *userData) +{ + ctx->callbackConnected = fptr; + ctx->callbackConnectedUserData = userData; +} + +void UDPC_set_callback_disconnected( + UDPC_Context *ctx, UDPC_callback_disconnected fptr, void *userData) +{ + ctx->callbackDisconnected = fptr; + ctx->callbackDisconnectedUserData = userData; +} + +void UDPC_set_callback_received( + UDPC_Context *ctx, UDPC_callback_received fptr, void *userData) +{ + ctx->callbackReceived = fptr; + ctx->callbackReceivedUserData = userData; +} + +void UDPC_check_received(UDPC_Context *ctx) +{ + // TODO use a lock on receivedPackets + if(ctx->callbackReceived) + { + for(int x = 0; x * sizeof(UDPC_INTERNAL_PacketInfo) < ctx->receivedPackets->size; ++x) + { + UDPC_INTERNAL_PacketInfo *pinfo = UDPC_Deque_index_ptr( + ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo), x); + ctx->callbackReceived(ctx->callbackReceivedUserData, pinfo->data, pinfo->size); + free(pinfo->data); + } + UDPC_Deque_clear(ctx->receivedPackets); + } + else + { + UDPC_INTERNAL_log(ctx, 0, "Received callback not set"); + for(int x = 0; x * sizeof(UDPC_INTERNAL_PacketInfo) < ctx->receivedPackets->size; ++x) + { + UDPC_INTERNAL_PacketInfo *pinfo = UDPC_Deque_index_ptr( + ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo), x); + free(pinfo->data); + } + UDPC_Deque_clear(ctx->receivedPackets); + } +} + +int UDPC_queue_send(UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *data, uint32_t size) +{ + UDPC_INTERNAL_ConnectionData *cd = UDPC_HashMap_get(ctx->conMap, addr); + if(cd) + { + UDPC_INTERNAL_PacketInfo pinfo = { + addr, + 0, + isChecked != 0 ? 0x2 : 0, + malloc(size), + size, + {0, 0} + }; + if(pinfo.data) + { + memcpy(pinfo.data, data, size); + UDPC_Deque_push_back(cd->sendPktQueue, &pinfo, sizeof(UDPC_INTERNAL_PacketInfo)); + return 1; + } + else + { + UDPC_INTERNAL_log(ctx, 0, "Failed to allocate memory to new send-packet queue entry"); + return 0; + } + } + else + { + UDPC_INTERNAL_log(ctx, 0, "Cannot send to %s when connection has not been esablished", + UDPC_INTERNAL_atostr(ctx, addr)); + return 0; + } +} + uint32_t UDPC_get_error(UDPC_Context *ctx) { uint32_t error = ctx->error; @@ -233,7 +322,7 @@ void UDPC_update(UDPC_Context *ctx) ctx }; timespec_get(&us.tsNow, TIME_UTC); - us.dt = UDPC_ts_diff_to_seconds(&us.tsNow, &ctx->lastUpdated); + us.dt = UDPC_INTERNAL_ts_diff(&us.tsNow, &ctx->lastUpdated); ctx->lastUpdated = us.tsNow; us.removedQueue = UDPC_Deque_init(4 * (ctx->conMap->size)); @@ -301,7 +390,7 @@ void UDPC_update(UDPC_Context *ctx) ntohs(receivedData.sin_port)); UDPC_INTERNAL_ConnectionData newCD = { 1, - 0, + (ctx->flags & 0x2) != 0 ? conID : UDPC_INTERNAL_generate_id(ctx), 0, 0, 0xFFFFFFFF, @@ -317,9 +406,13 @@ void UDPC_update(UDPC_Context *ctx) {0, 0}, 0.0f }; - timespec_get(&newCD.received, TIME_UTC); - timespec_get(&newCD.sent, TIME_UTC); + newCD.received = us.tsNow; + newCD.sent = us.tsNow; UDPC_HashMap_insert(ctx->conMap, newCD.addr, &newCD); + if(ctx->callbackConnected) + { + ctx->callbackConnected(ctx->callbackConnectedUserData, receivedData.sin_addr.s_addr); + } } return; } @@ -418,7 +511,23 @@ void UDPC_update(UDPC_Context *ctx) UDPC_INTERNAL_atostr(ctx, cd->addr)); } - // TODO received packet callback here + if(bytes > 20) + { + UDPC_INTERNAL_PacketInfo receivedInfo; + receivedInfo.addr = receivedData.sin_addr.s_addr; + receivedInfo.id = conID; + receivedInfo.flags = (isNotRecvCheck != 0 ? 0 : 0x2) | (isResent != 0 ? 0x5 : 0); + receivedInfo.data = malloc(bytes - 20); + memcpy(receivedInfo.data, ctx->recvBuf + 20, bytes - 20); + receivedInfo.size = bytes - 20; + receivedInfo.sent = us.tsNow; + + UDPC_Deque_push_back(ctx->receivedPackets, &receivedInfo, sizeof(UDPC_INTERNAL_PacketInfo)); + } + else + { + UDPC_INTERNAL_log(ctx, 3, "Packet has no payload, not adding to received queue"); + } } void UDPC_INTERNAL_update_to_rtt_si(void *userData, uint32_t addr, char *data) @@ -428,12 +537,16 @@ void UDPC_INTERNAL_update_to_rtt_si(void *userData, uint32_t addr, char *data) UDPC_INTERNAL_ConnectionData *cd = (UDPC_INTERNAL_ConnectionData*)data; // check for timed out connection - if(UDPC_ts_diff_to_seconds(&us->tsNow, &cd->received) >= UDPC_TIMEOUT_SECONDS) + if(UDPC_INTERNAL_ts_diff(&us->tsNow, &cd->received) >= UDPC_TIMEOUT_SECONDS) { UDPC_Deque_push_back(us->removedQueue, &addr, 4); UDPC_INTERNAL_log(us->ctx, 2, "Connection timed out with addr %s port %d", UDPC_INTERNAL_atostr(us->ctx, addr), cd->port); + if(us->ctx->callbackDisconnected) + { + us->ctx->callbackDisconnected(us->ctx->callbackDisconnectedUserData, addr); + } return; } @@ -513,7 +626,7 @@ void UDPC_INTERNAL_update_send(void *userData, uint32_t addr, char *data) if(cd->sendPktQueue->size == 0) { // send packet queue is empty, send heartbeat packet - if(UDPC_ts_diff_to_seconds(&us->tsNow, &cd->sent) < UDPC_HEARTBEAT_PKT_INTERVAL) + if(UDPC_INTERNAL_ts_diff(&us->tsNow, &cd->sent) < UDPC_HEARTBEAT_PKT_INTERVAL) { return; } @@ -653,7 +766,7 @@ void UDPC_INTERNAL_update_rtt( UDPC_INTERNAL_PacketInfo *pinfo = UDPC_Deque_index_ptr(cd->sentPkts, sizeof(UDPC_INTERNAL_PacketInfo), x); if(pinfo->id == rseq) { - float diff = UDPC_ts_diff_to_seconds(tsNow, &pinfo->sent); + float diff = UDPC_INTERNAL_ts_diff(tsNow, &pinfo->sent); if(diff > cd->rtt) { cd->rtt += (diff - cd->rtt) / 10.0f; @@ -705,7 +818,7 @@ void UDPC_INTERNAL_check_pkt_timeout( // is not received checked or already resent break; } - float seconds = UDPC_ts_diff_to_seconds(tsNow, &pinfo->sent); + float seconds = UDPC_INTERNAL_ts_diff(tsNow, &pinfo->sent); if(seconds >= UDPC_PACKET_TIMEOUT_SEC) { if(pinfo->size <= 20) @@ -747,7 +860,7 @@ void UDPC_INTERNAL_check_pkt_timeout( } } -float UDPC_ts_diff_to_seconds(struct timespec *ts0, struct timespec *ts1) +float UDPC_INTERNAL_ts_diff(struct timespec *ts0, struct timespec *ts1) { float sec = 0.0f; if(!ts0 || !ts1) @@ -814,6 +927,7 @@ int UDPC_INTERNAL_threadfn(void *context) } mtx_lock(&ctx->tCVMtx); cnd_timedwait(&ctx->threadCV, &ctx->tCVMtx, &ts); + UDPC_update(ctx); mtx_unlock(&ctx->tCVMtx); mtx_lock(&ctx->tflagsMtx); @@ -916,3 +1030,25 @@ char* UDPC_INTERNAL_atostr(UDPC_Context *ctx, uint32_t addr) return ctx->atostrBuf; } + +uint32_t UDPC_INTERNAL_generate_id(UDPC_Context *ctx) +{ + uint32_t newID = 0x10000000; + + while(newID == 0x10000000) + { + newID = rand() % 0x10000000; + UDPC_HashMap_itercall(ctx->conMap, UDPC_INTERNAL_check_ids, &newID); + } + + return newID; +} + +void UDPC_INTERNAL_check_ids(void *userData, uint32_t addr, char *data) +{ + UDPC_INTERNAL_ConnectionData *cd = (UDPC_INTERNAL_ConnectionData*)data; + if(cd->id == *((uint32_t*)userData)) + { + *((uint32_t*)userData) = 0x10000000; + } +} diff --git a/src/UDPConnection.h b/src/UDPConnection.h index a1ab794..d1290b2 100644 --- a/src/UDPConnection.h +++ b/src/UDPConnection.h @@ -27,6 +27,27 @@ #define UDPC_ATOSTR_BUF_SIZE 16 +/// (void *userData, uint32_t address) +/*! + * Note address is in network byte order (usually big-endian) + */ +typedef void (*UDPC_callback_connected)(void*, uint32_t); + +/// (void *userData, uint32_t address) +/*! + * Note address is in network byte order (usually big-endian) + */ +typedef void (*UDPC_callback_disconnected)(void*, uint32_t); + +/// (void *userData, char *packetData, uint32_t packetSize) +/*! + * The data pointed to by the packetData argument is to data internally managed + * by the UDPC_Context. It will change every time this callback is called so do + * not depend on it persisting. This means you should copy the data out of it + * when the callback is invoked and work with the copied data. + */ +typedef void (*UDPC_callback_received)(void*, char*, uint32_t); + /// This struct should not be used outside of this library typedef struct { uint32_t addr; // in network order (big-endian) @@ -37,7 +58,7 @@ typedef struct { * 0x4 - has been re-sent */ uint32_t flags; - char *data; // no-header in sendPktQueue, header in sentPkts + char *data; // no-header in sendPktQueue and receivedPackets, header in sentPkts uint32_t size; struct timespec sent; } UDPC_INTERNAL_PacketInfo; @@ -94,6 +115,14 @@ typedef struct { struct timespec lastUpdated; char atostrBuf[UDPC_ATOSTR_BUF_SIZE]; char recvBuf[UDPC_PACKET_MAX_SIZE]; + UDPC_Deque *receivedPackets; + + UDPC_callback_connected callbackConnected; + void *callbackConnectedUserData; + UDPC_callback_disconnected callbackDisconnected; + void *callbackDisconnectedUserData; + UDPC_callback_received callbackReceived; + void *callbackReceivedUserData; } UDPC_Context; typedef struct { @@ -111,6 +140,27 @@ void UDPC_destroy(UDPC_Context *ctx); void UDPC_INTERNAL_destroy_conMap(void *unused, uint32_t addr, char *data); +void UDPC_set_callback_connected( + UDPC_Context *ctx, UDPC_callback_connected fptr, void *userData); + +void UDPC_set_callback_disconnected( + UDPC_Context *ctx, UDPC_callback_disconnected fptr, void *userData); + +void UDPC_set_callback_received( + UDPC_Context *ctx, UDPC_callback_received fptr, void *userData); + +void UDPC_check_received(UDPC_Context *ctx); + +/*! + * \brief Queues a packet to send to a connected peer + * Note addr is expected to be in network-byte-order (big-endian). + * If isChecked is non-zero, UDPC will attempt to resend the packet if peer has + * not received it within UDPC_PACKET_TIMEOUT_SEC seconds. + * \return non-zero on success + */ +int UDPC_queue_send( + UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *data, uint32_t size); + uint32_t UDPC_get_error(UDPC_Context *ctx); const char* UDPC_get_error_str(uint32_t error); @@ -146,9 +196,9 @@ void UDPC_INTERNAL_check_pkt_timeout( uint32_t ack, struct timespec *tsNow); -float UDPC_ts_diff_to_seconds(struct timespec *ts0, struct timespec *ts1); +float UDPC_INTERNAL_ts_diff(struct timespec *ts0, struct timespec *ts1); -int UDPC_INTERNAL_threadfn(void *context); // internal usage only +int UDPC_INTERNAL_threadfn(void *context); /* * 0x1 - is ping @@ -173,4 +223,8 @@ void UDPC_INTERNAL_log(UDPC_Context *ctx, uint32_t level, const char *msg, ...); char* UDPC_INTERNAL_atostr(UDPC_Context *ctx, uint32_t addr); +uint32_t UDPC_INTERNAL_generate_id(UDPC_Context *ctx); + +void UDPC_INTERNAL_check_ids(void *userData, uint32_t addr, char *data); + #endif