Fixes/improvements to UDPConnection, still WIP
Need to implement threaded update properly.
This commit is contained in:
parent
29e3940c14
commit
d6f5653048
3 changed files with 100 additions and 11 deletions
|
@ -46,12 +46,16 @@ static const char *UDPC_ERR_THREADFAIL_STR = "Failed to create thread";
|
||||||
|
|
||||||
#define UDPC_SENT_PKTS_MAX_SIZE 34
|
#define UDPC_SENT_PKTS_MAX_SIZE 34
|
||||||
#define UDPC_SENT_PKTS_ALLOC_SIZE 35
|
#define UDPC_SENT_PKTS_ALLOC_SIZE 35
|
||||||
|
#define UDPC_SEND_PKTS_ALLOC_SIZE 40
|
||||||
|
|
||||||
#define UDPC_PACKET_MAX_SIZE 8192
|
#define UDPC_PACKET_MAX_SIZE 8192
|
||||||
|
|
||||||
#define UDPC_PACKET_TIMEOUT_SEC 1.0f
|
#define UDPC_PACKET_TIMEOUT_SEC 1.0f
|
||||||
#define UDPC_GOOD_RTT_LIMIT_SEC 0.25f
|
#define UDPC_GOOD_RTT_LIMIT_SEC 0.25f
|
||||||
|
|
||||||
#define UDPC_REC_PKTS_ALLOC_SIZE 64
|
#define UDPC_REC_PKTS_ALLOC_SIZE 128
|
||||||
|
|
||||||
|
#define UDPC_CONNECTED_EVENT_SIZE 64
|
||||||
|
#define UDPC_DISCONNECTED_EVENT_SIZE 64
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -15,6 +15,10 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient)
|
||||||
context->conMap = UDPC_HashMap_init(13, sizeof(UDPC_INTERNAL_ConnectionData));
|
context->conMap = UDPC_HashMap_init(13, sizeof(UDPC_INTERNAL_ConnectionData));
|
||||||
timespec_get(&context->lastUpdated, TIME_UTC);
|
timespec_get(&context->lastUpdated, TIME_UTC);
|
||||||
context->atostrBuf[UDPC_ATOSTR_BUF_SIZE - 1] = 0;
|
context->atostrBuf[UDPC_ATOSTR_BUF_SIZE - 1] = 0;
|
||||||
|
context->connectedEvents = UDPC_Deque_init(
|
||||||
|
UDPC_CONNECTED_EVENT_SIZE * 4);
|
||||||
|
context->disconnectedEvents = UDPC_Deque_init(
|
||||||
|
UDPC_DISCONNECTED_EVENT_SIZE * 4);
|
||||||
context->receivedPackets = UDPC_Deque_init(
|
context->receivedPackets = UDPC_Deque_init(
|
||||||
UDPC_REC_PKTS_ALLOC_SIZE * sizeof(UDPC_INTERNAL_PacketInfo));
|
UDPC_REC_PKTS_ALLOC_SIZE * sizeof(UDPC_INTERNAL_PacketInfo));
|
||||||
context->callbackConnected = NULL;
|
context->callbackConnected = NULL;
|
||||||
|
@ -206,9 +210,39 @@ void UDPC_set_callback_received(
|
||||||
ctx->callbackReceivedUserData = userData;
|
ctx->callbackReceivedUserData = userData;
|
||||||
}
|
}
|
||||||
|
|
||||||
void UDPC_check_received(UDPC_Context *ctx)
|
void UDPC_check_events(UDPC_Context *ctx)
|
||||||
{
|
{
|
||||||
// TODO use a lock on receivedPackets
|
// TODO use a lock on receivedPackets
|
||||||
|
if(ctx->callbackConnected)
|
||||||
|
{
|
||||||
|
for(int x = 0; x * 4 < ctx->connectedEvents->size; ++x)
|
||||||
|
{
|
||||||
|
ctx->callbackConnected(ctx->callbackConnectedUserData,
|
||||||
|
*((uint32_t*)UDPC_Deque_index_ptr(ctx->connectedEvents, 4, x)));
|
||||||
|
}
|
||||||
|
UDPC_Deque_clear(ctx->connectedEvents);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
UDPC_INTERNAL_log(ctx, 0, "Connected callback not set");
|
||||||
|
UDPC_Deque_clear(ctx->connectedEvents);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(ctx->callbackDisconnected)
|
||||||
|
{
|
||||||
|
for(int x = 0; x * 4 < ctx->disconnectedEvents->size; ++x)
|
||||||
|
{
|
||||||
|
ctx->callbackDisconnected(ctx->callbackDisconnectedUserData,
|
||||||
|
*((uint32_t*)UDPC_Deque_index_ptr(ctx->disconnectedEvents, 4, x)));
|
||||||
|
}
|
||||||
|
UDPC_Deque_clear(ctx->disconnectedEvents);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
UDPC_INTERNAL_log(ctx, 0, "Disconnected callback not set");
|
||||||
|
UDPC_Deque_clear(ctx->disconnectedEvents);
|
||||||
|
}
|
||||||
|
|
||||||
if(ctx->callbackReceived)
|
if(ctx->callbackReceived)
|
||||||
{
|
{
|
||||||
for(int x = 0; x * sizeof(UDPC_INTERNAL_PacketInfo) < ctx->receivedPackets->size; ++x)
|
for(int x = 0; x * sizeof(UDPC_INTERNAL_PacketInfo) < ctx->receivedPackets->size; ++x)
|
||||||
|
@ -249,9 +283,17 @@ int UDPC_queue_send(UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *
|
||||||
if(pinfo.data)
|
if(pinfo.data)
|
||||||
{
|
{
|
||||||
memcpy(pinfo.data, data, size);
|
memcpy(pinfo.data, data, size);
|
||||||
UDPC_Deque_push_back(cd->sendPktQueue, &pinfo, sizeof(UDPC_INTERNAL_PacketInfo));
|
if(UDPC_Deque_push_back(cd->sendPktQueue, &pinfo, sizeof(UDPC_INTERNAL_PacketInfo)) == 0)
|
||||||
|
{
|
||||||
|
UDPC_INTERNAL_log(ctx, 1, "Not enough free space in send "
|
||||||
|
"packet queue, failed to queue packet for sending");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
UDPC_INTERNAL_log(ctx, 0, "Failed to allocate memory to new send-packet queue entry");
|
UDPC_INTERNAL_log(ctx, 0, "Failed to allocate memory to new send-packet queue entry");
|
||||||
|
@ -266,6 +308,14 @@ int UDPC_queue_send(UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int UDPC_get_queue_send_available(UDPC_Context *ctx, uint32_t addr)
|
||||||
|
{
|
||||||
|
UDPC_INTERNAL_ConnectionData *cd = UDPC_HashMap_get(ctx->conMap, addr);
|
||||||
|
if(!cd) { return 0; }
|
||||||
|
|
||||||
|
return UDPC_Deque_get_available(cd->sendPktQueue) / sizeof(UDPC_INTERNAL_PacketInfo);
|
||||||
|
}
|
||||||
|
|
||||||
uint32_t UDPC_get_error(UDPC_Context *ctx)
|
uint32_t UDPC_get_error(UDPC_Context *ctx)
|
||||||
{
|
{
|
||||||
uint32_t error = ctx->error;
|
uint32_t error = ctx->error;
|
||||||
|
@ -401,7 +451,7 @@ void UDPC_update(UDPC_Context *ctx)
|
||||||
receivedData.sin_addr.s_addr,
|
receivedData.sin_addr.s_addr,
|
||||||
ntohs(receivedData.sin_port),
|
ntohs(receivedData.sin_port),
|
||||||
UDPC_Deque_init(sizeof(UDPC_INTERNAL_PacketInfo) * UDPC_SENT_PKTS_ALLOC_SIZE),
|
UDPC_Deque_init(sizeof(UDPC_INTERNAL_PacketInfo) * UDPC_SENT_PKTS_ALLOC_SIZE),
|
||||||
UDPC_Deque_init(sizeof(UDPC_INTERNAL_PacketInfo) * UDPC_SENT_PKTS_ALLOC_SIZE),
|
UDPC_Deque_init(sizeof(UDPC_INTERNAL_PacketInfo) * UDPC_SEND_PKTS_ALLOC_SIZE),
|
||||||
{0, 0},
|
{0, 0},
|
||||||
{0, 0},
|
{0, 0},
|
||||||
0.0f
|
0.0f
|
||||||
|
@ -409,9 +459,16 @@ void UDPC_update(UDPC_Context *ctx)
|
||||||
newCD.received = us.tsNow;
|
newCD.received = us.tsNow;
|
||||||
newCD.sent = us.tsNow;
|
newCD.sent = us.tsNow;
|
||||||
UDPC_HashMap_insert(ctx->conMap, newCD.addr, &newCD);
|
UDPC_HashMap_insert(ctx->conMap, newCD.addr, &newCD);
|
||||||
if(ctx->callbackConnected)
|
if(UDPC_Deque_get_available(ctx->connectedEvents) == 0)
|
||||||
{
|
{
|
||||||
ctx->callbackConnected(ctx->callbackConnectedUserData, receivedData.sin_addr.s_addr);
|
UDPC_Deque_pop_front(ctx->connectedEvents, 4);
|
||||||
|
UDPC_Deque_push_back(ctx->connectedEvents, &receivedData.sin_addr.s_addr, 4);
|
||||||
|
UDPC_INTERNAL_log(ctx, 1, "Not enough free space in connected "
|
||||||
|
"events queue, removing oldest to make room");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
UDPC_Deque_push_back(ctx->connectedEvents, &receivedData.sin_addr.s_addr, 4);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
@ -522,7 +579,19 @@ void UDPC_update(UDPC_Context *ctx)
|
||||||
receivedInfo.size = bytes - 20;
|
receivedInfo.size = bytes - 20;
|
||||||
receivedInfo.sent = us.tsNow;
|
receivedInfo.sent = us.tsNow;
|
||||||
|
|
||||||
|
if(UDPC_Deque_get_available(ctx->receivedPackets) == 0)
|
||||||
|
{
|
||||||
|
UDPC_INTERNAL_PacketInfo *rpinfo = UDPC_Deque_get_front_ptr(
|
||||||
|
ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo));
|
||||||
|
if(rpinfo->data) { free(rpinfo->data); }
|
||||||
|
UDPC_Deque_pop_front(ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo));
|
||||||
UDPC_Deque_push_back(ctx->receivedPackets, &receivedInfo, sizeof(UDPC_INTERNAL_PacketInfo));
|
UDPC_Deque_push_back(ctx->receivedPackets, &receivedInfo, sizeof(UDPC_INTERNAL_PacketInfo));
|
||||||
|
UDPC_INTERNAL_log(ctx, 1, "Received packet but not enough space in received queue, removing oldest packet to make room");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
UDPC_Deque_push_back(ctx->receivedPackets, &receivedInfo, sizeof(UDPC_INTERNAL_PacketInfo));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -543,9 +612,16 @@ void UDPC_INTERNAL_update_to_rtt_si(void *userData, uint32_t addr, char *data)
|
||||||
UDPC_INTERNAL_log(us->ctx, 2, "Connection timed out with addr %s port %d",
|
UDPC_INTERNAL_log(us->ctx, 2, "Connection timed out with addr %s port %d",
|
||||||
UDPC_INTERNAL_atostr(us->ctx, addr),
|
UDPC_INTERNAL_atostr(us->ctx, addr),
|
||||||
cd->port);
|
cd->port);
|
||||||
if(us->ctx->callbackDisconnected)
|
if(UDPC_Deque_get_available(us->ctx->disconnectedEvents) == 0)
|
||||||
{
|
{
|
||||||
us->ctx->callbackDisconnected(us->ctx->callbackDisconnectedUserData, addr);
|
UDPC_Deque_pop_front(us->ctx->disconnectedEvents, 4);
|
||||||
|
UDPC_Deque_push_back(us->ctx->disconnectedEvents, &addr, 4);
|
||||||
|
UDPC_INTERNAL_log(us->ctx, 1, "Not enough free space in "
|
||||||
|
"disconnected events queue, removing oldest event to make room");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
UDPC_Deque_push_back(us->ctx->disconnectedEvents, &addr, 4);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -847,6 +923,7 @@ void UDPC_INTERNAL_check_pkt_timeout(
|
||||||
pinfo->flags |= 0x4;
|
pinfo->flags |= 0x4;
|
||||||
pinfo->data = NULL;
|
pinfo->data = NULL;
|
||||||
pinfo->size = 0;
|
pinfo->size = 0;
|
||||||
|
// TODO use separate queue for resending packets
|
||||||
UDPC_Deque_push_back(
|
UDPC_Deque_push_back(
|
||||||
cd->sendPktQueue,
|
cd->sendPktQueue,
|
||||||
&newPkt,
|
&newPkt,
|
||||||
|
|
|
@ -115,6 +115,8 @@ typedef struct {
|
||||||
struct timespec lastUpdated;
|
struct timespec lastUpdated;
|
||||||
char atostrBuf[UDPC_ATOSTR_BUF_SIZE];
|
char atostrBuf[UDPC_ATOSTR_BUF_SIZE];
|
||||||
char recvBuf[UDPC_PACKET_MAX_SIZE];
|
char recvBuf[UDPC_PACKET_MAX_SIZE];
|
||||||
|
UDPC_Deque *connectedEvents;
|
||||||
|
UDPC_Deque *disconnectedEvents;
|
||||||
UDPC_Deque *receivedPackets;
|
UDPC_Deque *receivedPackets;
|
||||||
|
|
||||||
UDPC_callback_connected callbackConnected;
|
UDPC_callback_connected callbackConnected;
|
||||||
|
@ -149,7 +151,7 @@ void UDPC_set_callback_disconnected(
|
||||||
void UDPC_set_callback_received(
|
void UDPC_set_callback_received(
|
||||||
UDPC_Context *ctx, UDPC_callback_received fptr, void *userData);
|
UDPC_Context *ctx, UDPC_callback_received fptr, void *userData);
|
||||||
|
|
||||||
void UDPC_check_received(UDPC_Context *ctx);
|
void UDPC_check_events(UDPC_Context *ctx);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Queues a packet to send to a connected peer
|
* \brief Queues a packet to send to a connected peer
|
||||||
|
@ -161,6 +163,12 @@ void UDPC_check_received(UDPC_Context *ctx);
|
||||||
int UDPC_queue_send(
|
int UDPC_queue_send(
|
||||||
UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *data, uint32_t size);
|
UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *data, uint32_t size);
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief get the number of packets that can be queued to the addr
|
||||||
|
* \return number of queueable packets or 0 if connection has not been established
|
||||||
|
*/
|
||||||
|
int UDPC_get_queue_send_available(UDPC_Context *ctx, uint32_t addr);
|
||||||
|
|
||||||
uint32_t UDPC_get_error(UDPC_Context *ctx);
|
uint32_t UDPC_get_error(UDPC_Context *ctx);
|
||||||
|
|
||||||
const char* UDPC_get_error_str(uint32_t error);
|
const char* UDPC_get_error_str(uint32_t error);
|
||||||
|
|
Loading…
Reference in a new issue