diff --git a/src/UDPC_Defines.h b/src/UDPC_Defines.h index 6bf767e..6549a1a 100644 --- a/src/UDPC_Defines.h +++ b/src/UDPC_Defines.h @@ -46,12 +46,16 @@ static const char *UDPC_ERR_THREADFAIL_STR = "Failed to create thread"; #define UDPC_SENT_PKTS_MAX_SIZE 34 #define UDPC_SENT_PKTS_ALLOC_SIZE 35 +#define UDPC_SEND_PKTS_ALLOC_SIZE 40 #define UDPC_PACKET_MAX_SIZE 8192 #define UDPC_PACKET_TIMEOUT_SEC 1.0f #define UDPC_GOOD_RTT_LIMIT_SEC 0.25f -#define UDPC_REC_PKTS_ALLOC_SIZE 64 +#define UDPC_REC_PKTS_ALLOC_SIZE 128 + +#define UDPC_CONNECTED_EVENT_SIZE 64 +#define UDPC_DISCONNECTED_EVENT_SIZE 64 #endif diff --git a/src/UDPConnection.c b/src/UDPConnection.c index 0fa21a3..50eaa77 100644 --- a/src/UDPConnection.c +++ b/src/UDPConnection.c @@ -15,6 +15,10 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient) context->conMap = UDPC_HashMap_init(13, sizeof(UDPC_INTERNAL_ConnectionData)); timespec_get(&context->lastUpdated, TIME_UTC); context->atostrBuf[UDPC_ATOSTR_BUF_SIZE - 1] = 0; + context->connectedEvents = UDPC_Deque_init( + UDPC_CONNECTED_EVENT_SIZE * 4); + context->disconnectedEvents = UDPC_Deque_init( + UDPC_DISCONNECTED_EVENT_SIZE * 4); context->receivedPackets = UDPC_Deque_init( UDPC_REC_PKTS_ALLOC_SIZE * sizeof(UDPC_INTERNAL_PacketInfo)); context->callbackConnected = NULL; @@ -206,9 +210,39 @@ void UDPC_set_callback_received( ctx->callbackReceivedUserData = userData; } -void UDPC_check_received(UDPC_Context *ctx) +void UDPC_check_events(UDPC_Context *ctx) { // TODO use a lock on receivedPackets + if(ctx->callbackConnected) + { + for(int x = 0; x * 4 < ctx->connectedEvents->size; ++x) + { + ctx->callbackConnected(ctx->callbackConnectedUserData, + *((uint32_t*)UDPC_Deque_index_ptr(ctx->connectedEvents, 4, x))); + } + UDPC_Deque_clear(ctx->connectedEvents); + } + else + { + UDPC_INTERNAL_log(ctx, 0, "Connected callback not set"); + UDPC_Deque_clear(ctx->connectedEvents); + } + + if(ctx->callbackDisconnected) + { + for(int x = 0; x * 4 < ctx->disconnectedEvents->size; ++x) + { + ctx->callbackDisconnected(ctx->callbackDisconnectedUserData, + *((uint32_t*)UDPC_Deque_index_ptr(ctx->disconnectedEvents, 4, x))); + } + UDPC_Deque_clear(ctx->disconnectedEvents); + } + else + { + UDPC_INTERNAL_log(ctx, 0, "Disconnected callback not set"); + UDPC_Deque_clear(ctx->disconnectedEvents); + } + if(ctx->callbackReceived) { for(int x = 0; x * sizeof(UDPC_INTERNAL_PacketInfo) < ctx->receivedPackets->size; ++x) @@ -249,8 +283,16 @@ int UDPC_queue_send(UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void * if(pinfo.data) { memcpy(pinfo.data, data, size); - UDPC_Deque_push_back(cd->sendPktQueue, &pinfo, sizeof(UDPC_INTERNAL_PacketInfo)); - return 1; + if(UDPC_Deque_push_back(cd->sendPktQueue, &pinfo, sizeof(UDPC_INTERNAL_PacketInfo)) == 0) + { + UDPC_INTERNAL_log(ctx, 1, "Not enough free space in send " + "packet queue, failed to queue packet for sending"); + return 0; + } + else + { + return 1; + } } else { @@ -266,6 +308,14 @@ int UDPC_queue_send(UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void * } } +int UDPC_get_queue_send_available(UDPC_Context *ctx, uint32_t addr) +{ + UDPC_INTERNAL_ConnectionData *cd = UDPC_HashMap_get(ctx->conMap, addr); + if(!cd) { return 0; } + + return UDPC_Deque_get_available(cd->sendPktQueue) / sizeof(UDPC_INTERNAL_PacketInfo); +} + uint32_t UDPC_get_error(UDPC_Context *ctx) { uint32_t error = ctx->error; @@ -401,7 +451,7 @@ void UDPC_update(UDPC_Context *ctx) receivedData.sin_addr.s_addr, ntohs(receivedData.sin_port), UDPC_Deque_init(sizeof(UDPC_INTERNAL_PacketInfo) * UDPC_SENT_PKTS_ALLOC_SIZE), - UDPC_Deque_init(sizeof(UDPC_INTERNAL_PacketInfo) * UDPC_SENT_PKTS_ALLOC_SIZE), + UDPC_Deque_init(sizeof(UDPC_INTERNAL_PacketInfo) * UDPC_SEND_PKTS_ALLOC_SIZE), {0, 0}, {0, 0}, 0.0f @@ -409,9 +459,16 @@ void UDPC_update(UDPC_Context *ctx) newCD.received = us.tsNow; newCD.sent = us.tsNow; UDPC_HashMap_insert(ctx->conMap, newCD.addr, &newCD); - if(ctx->callbackConnected) + if(UDPC_Deque_get_available(ctx->connectedEvents) == 0) { - ctx->callbackConnected(ctx->callbackConnectedUserData, receivedData.sin_addr.s_addr); + UDPC_Deque_pop_front(ctx->connectedEvents, 4); + UDPC_Deque_push_back(ctx->connectedEvents, &receivedData.sin_addr.s_addr, 4); + UDPC_INTERNAL_log(ctx, 1, "Not enough free space in connected " + "events queue, removing oldest to make room"); + } + else + { + UDPC_Deque_push_back(ctx->connectedEvents, &receivedData.sin_addr.s_addr, 4); } } return; @@ -522,7 +579,19 @@ void UDPC_update(UDPC_Context *ctx) receivedInfo.size = bytes - 20; receivedInfo.sent = us.tsNow; - UDPC_Deque_push_back(ctx->receivedPackets, &receivedInfo, sizeof(UDPC_INTERNAL_PacketInfo)); + if(UDPC_Deque_get_available(ctx->receivedPackets) == 0) + { + UDPC_INTERNAL_PacketInfo *rpinfo = UDPC_Deque_get_front_ptr( + ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo)); + if(rpinfo->data) { free(rpinfo->data); } + UDPC_Deque_pop_front(ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo)); + UDPC_Deque_push_back(ctx->receivedPackets, &receivedInfo, sizeof(UDPC_INTERNAL_PacketInfo)); + UDPC_INTERNAL_log(ctx, 1, "Received packet but not enough space in received queue, removing oldest packet to make room"); + } + else + { + UDPC_Deque_push_back(ctx->receivedPackets, &receivedInfo, sizeof(UDPC_INTERNAL_PacketInfo)); + } } else { @@ -543,9 +612,16 @@ void UDPC_INTERNAL_update_to_rtt_si(void *userData, uint32_t addr, char *data) UDPC_INTERNAL_log(us->ctx, 2, "Connection timed out with addr %s port %d", UDPC_INTERNAL_atostr(us->ctx, addr), cd->port); - if(us->ctx->callbackDisconnected) + if(UDPC_Deque_get_available(us->ctx->disconnectedEvents) == 0) { - us->ctx->callbackDisconnected(us->ctx->callbackDisconnectedUserData, addr); + UDPC_Deque_pop_front(us->ctx->disconnectedEvents, 4); + UDPC_Deque_push_back(us->ctx->disconnectedEvents, &addr, 4); + UDPC_INTERNAL_log(us->ctx, 1, "Not enough free space in " + "disconnected events queue, removing oldest event to make room"); + } + else + { + UDPC_Deque_push_back(us->ctx->disconnectedEvents, &addr, 4); } return; } @@ -847,6 +923,7 @@ void UDPC_INTERNAL_check_pkt_timeout( pinfo->flags |= 0x4; pinfo->data = NULL; pinfo->size = 0; + // TODO use separate queue for resending packets UDPC_Deque_push_back( cd->sendPktQueue, &newPkt, diff --git a/src/UDPConnection.h b/src/UDPConnection.h index d1290b2..d470bcf 100644 --- a/src/UDPConnection.h +++ b/src/UDPConnection.h @@ -115,6 +115,8 @@ typedef struct { struct timespec lastUpdated; char atostrBuf[UDPC_ATOSTR_BUF_SIZE]; char recvBuf[UDPC_PACKET_MAX_SIZE]; + UDPC_Deque *connectedEvents; + UDPC_Deque *disconnectedEvents; UDPC_Deque *receivedPackets; UDPC_callback_connected callbackConnected; @@ -149,7 +151,7 @@ void UDPC_set_callback_disconnected( void UDPC_set_callback_received( UDPC_Context *ctx, UDPC_callback_received fptr, void *userData); -void UDPC_check_received(UDPC_Context *ctx); +void UDPC_check_events(UDPC_Context *ctx); /*! * \brief Queues a packet to send to a connected peer @@ -161,6 +163,12 @@ void UDPC_check_received(UDPC_Context *ctx); int UDPC_queue_send( UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *data, uint32_t size); +/*! + * \brief get the number of packets that can be queued to the addr + * \return number of queueable packets or 0 if connection has not been established + */ +int UDPC_get_queue_send_available(UDPC_Context *ctx, uint32_t addr); + uint32_t UDPC_get_error(UDPC_Context *ctx); const char* UDPC_get_error_str(uint32_t error);