Impl add to send-queue and recv callbacks, WIP

Still not finished, need to set up the thread to send queued packets and
receive packets safely.

Also need to add to Deque (see TODO comment).
This commit is contained in:
Stephen Seo 2019-02-20 15:13:57 +09:00
parent 629abf6d8e
commit 69edca40f8
4 changed files with 224 additions and 30 deletions

View file

@ -52,4 +52,6 @@ static const char *UDPC_ERR_THREADFAIL_STR = "Failed to create thread";
#define UDPC_PACKET_TIMEOUT_SEC 1.0f #define UDPC_PACKET_TIMEOUT_SEC 1.0f
#define UDPC_GOOD_RTT_LIMIT_SEC 0.25f #define UDPC_GOOD_RTT_LIMIT_SEC 0.25f
#define UDPC_REC_PKTS_ALLOC_SIZE 64
#endif #endif

View file

@ -40,6 +40,8 @@ int UDPC_Deque_push_back(UDPC_Deque *deque, const void *data, uint32_t size);
*/ */
int UDPC_Deque_push_front(UDPC_Deque *deque, const void *data, uint32_t size); int UDPC_Deque_push_front(UDPC_Deque *deque, const void *data, uint32_t size);
// TODO add push_back/push_front variants that realloc on not enough free space
/*! /*!
* \return size in bytes of available data * \return size in bytes of available data
*/ */

View file

@ -9,10 +9,23 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient)
{ {
UDPC_Context *context = malloc(sizeof(UDPC_Context)); UDPC_Context *context = malloc(sizeof(UDPC_Context));
context->error = UDPC_SUCCESS; context->error = UDPC_SUCCESS;
context->flags = 0; context->flags = 0xC;
context->threadFlags = 0;
context->atostrBuf[UDPC_ATOSTR_BUF_SIZE - 1] = 0;
if(isClient != 0) context->flags |= 0x2; if(isClient != 0) context->flags |= 0x2;
context->threadFlags = 0;
context->conMap = UDPC_HashMap_init(13, sizeof(UDPC_INTERNAL_ConnectionData));
timespec_get(&context->lastUpdated, TIME_UTC);
context->atostrBuf[UDPC_ATOSTR_BUF_SIZE - 1] = 0;
context->receivedPackets = UDPC_Deque_init(
UDPC_REC_PKTS_ALLOC_SIZE * sizeof(UDPC_INTERNAL_PacketInfo));
context->callbackConnected = NULL;
context->callbackConnectedUserData = NULL;
context->callbackDisconnected = NULL;
context->callbackDisconnectedUserData = NULL;
context->callbackReceived = NULL;
context->callbackReceivedUserData = NULL;
// seed rand
srand(context->lastUpdated.tv_sec);
// create socket // create socket
context->socketHandle = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); context->socketHandle = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
@ -20,7 +33,7 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient)
{ {
context->socketHandle = 0; context->socketHandle = 0;
context->error = UDPC_ERR_SOCKETFAIL; context->error = UDPC_ERR_SOCKETFAIL;
fprintf(stderr, "Failed to create socket\n"); UDPC_INTERNAL_log(context, 0, "Failed to create socket");
return context; return context;
} }
@ -37,7 +50,7 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient)
context->error = UDPC_ERR_SOCKETBINDF; context->error = UDPC_ERR_SOCKETBINDF;
CleanupSocket(context->socketHandle); CleanupSocket(context->socketHandle);
context->socketHandle = 0; context->socketHandle = 0;
fprintf(stderr, "Failed to bind socket\n"); UDPC_INTERNAL_log(context, 0, "Failed to bind socket");
return context; return context;
} }
@ -56,19 +69,13 @@ UDPC_Context* UDPC_init(uint16_t listenPort, int isClient)
context->error = UDPC_ERR_SOCKETNONBF; context->error = UDPC_ERR_SOCKETNONBF;
CleanupSocket(context->socketHandle); CleanupSocket(context->socketHandle);
context->socketHandle = 0; context->socketHandle = 0;
fprintf(stderr, "Failed to set non-blocking on socket\n"); UDPC_INTERNAL_log(context, 0, "Failed to set non-blocking on socket");
#if UDPC_PLATFORM == UDPC_PLATFORM_UNKNOWN #if UDPC_PLATFORM == UDPC_PLATFORM_UNKNOWN
fprintf(stderr, "(Unknown platform)\n"); UDPC_INTERNAL_log(context, 0, "(Unknown platform)");
#endif #endif
return context; return context;
} }
context->conMap = UDPC_HashMap_init(13, sizeof(UDPC_INTERNAL_ConnectionData));
timespec_get(&context->lastUpdated, TIME_UTC);
context->flags |= (0x8 | 0x4);
return context; return context;
} }
@ -81,8 +88,9 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient)
{ {
CleanupSocket(context->socketHandle); CleanupSocket(context->socketHandle);
context->socketHandle = 0; context->socketHandle = 0;
fprintf(stderr, "Failed to create mutex\n"); UDPC_INTERNAL_log(context, 0, "Failed to create mutex");
context->error = UDPC_ERR_MTXFAIL; context->error = UDPC_ERR_MTXFAIL;
return context;
} }
context->error = UDPC_SUCCESS; context->error = UDPC_SUCCESS;
@ -92,7 +100,7 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient)
CleanupSocket(context->socketHandle); CleanupSocket(context->socketHandle);
context->socketHandle = 0; context->socketHandle = 0;
mtx_destroy(&context->tCVMtx); mtx_destroy(&context->tCVMtx);
fprintf(stderr, "Failed to create mutex\n"); UDPC_INTERNAL_log(context, 0, "Failed to create mutex");
context->error = UDPC_ERR_MTXFAIL; context->error = UDPC_ERR_MTXFAIL;
return context; return context;
} }
@ -105,7 +113,7 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient)
context->socketHandle = 0; context->socketHandle = 0;
mtx_destroy(&context->tCVMtx); mtx_destroy(&context->tCVMtx);
mtx_destroy(&context->tflagsMtx); mtx_destroy(&context->tflagsMtx);
fprintf(stderr, "Failed to create condition variable\n"); UDPC_INTERNAL_log(context, 0, "Failed to create condition variable");
context->error = UDPC_ERR_CVFAIL; context->error = UDPC_ERR_CVFAIL;
return context; return context;
} }
@ -120,7 +128,7 @@ UDPC_Context* UDPC_init_threaded_update(uint16_t listenPort, int isClient)
mtx_destroy(&context->tCVMtx); mtx_destroy(&context->tCVMtx);
mtx_destroy(&context->tflagsMtx); mtx_destroy(&context->tflagsMtx);
cnd_destroy(&context->threadCV); cnd_destroy(&context->threadCV);
fprintf(stderr, "Failed to create thread\n"); UDPC_INTERNAL_log(context, 0, "Failed to create thread");
context->error = UDPC_ERR_THREADFAIL; context->error = UDPC_ERR_THREADFAIL;
return context; return context;
} }
@ -177,6 +185,87 @@ void UDPC_INTERNAL_destroy_conMap(void *unused, uint32_t addr, char *data)
UDPC_Deque_destroy(cd->sendPktQueue); UDPC_Deque_destroy(cd->sendPktQueue);
} }
void UDPC_set_callback_connected(
UDPC_Context *ctx, UDPC_callback_connected fptr, void *userData)
{
ctx->callbackConnected = fptr;
ctx->callbackConnectedUserData = userData;
}
void UDPC_set_callback_disconnected(
UDPC_Context *ctx, UDPC_callback_disconnected fptr, void *userData)
{
ctx->callbackDisconnected = fptr;
ctx->callbackDisconnectedUserData = userData;
}
void UDPC_set_callback_received(
UDPC_Context *ctx, UDPC_callback_received fptr, void *userData)
{
ctx->callbackReceived = fptr;
ctx->callbackReceivedUserData = userData;
}
void UDPC_check_received(UDPC_Context *ctx)
{
// TODO use a lock on receivedPackets
if(ctx->callbackReceived)
{
for(int x = 0; x * sizeof(UDPC_INTERNAL_PacketInfo) < ctx->receivedPackets->size; ++x)
{
UDPC_INTERNAL_PacketInfo *pinfo = UDPC_Deque_index_ptr(
ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo), x);
ctx->callbackReceived(ctx->callbackReceivedUserData, pinfo->data, pinfo->size);
free(pinfo->data);
}
UDPC_Deque_clear(ctx->receivedPackets);
}
else
{
UDPC_INTERNAL_log(ctx, 0, "Received callback not set");
for(int x = 0; x * sizeof(UDPC_INTERNAL_PacketInfo) < ctx->receivedPackets->size; ++x)
{
UDPC_INTERNAL_PacketInfo *pinfo = UDPC_Deque_index_ptr(
ctx->receivedPackets, sizeof(UDPC_INTERNAL_PacketInfo), x);
free(pinfo->data);
}
UDPC_Deque_clear(ctx->receivedPackets);
}
}
int UDPC_queue_send(UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *data, uint32_t size)
{
UDPC_INTERNAL_ConnectionData *cd = UDPC_HashMap_get(ctx->conMap, addr);
if(cd)
{
UDPC_INTERNAL_PacketInfo pinfo = {
addr,
0,
isChecked != 0 ? 0x2 : 0,
malloc(size),
size,
{0, 0}
};
if(pinfo.data)
{
memcpy(pinfo.data, data, size);
UDPC_Deque_push_back(cd->sendPktQueue, &pinfo, sizeof(UDPC_INTERNAL_PacketInfo));
return 1;
}
else
{
UDPC_INTERNAL_log(ctx, 0, "Failed to allocate memory to new send-packet queue entry");
return 0;
}
}
else
{
UDPC_INTERNAL_log(ctx, 0, "Cannot send to %s when connection has not been esablished",
UDPC_INTERNAL_atostr(ctx, addr));
return 0;
}
}
uint32_t UDPC_get_error(UDPC_Context *ctx) uint32_t UDPC_get_error(UDPC_Context *ctx)
{ {
uint32_t error = ctx->error; uint32_t error = ctx->error;
@ -233,7 +322,7 @@ void UDPC_update(UDPC_Context *ctx)
ctx ctx
}; };
timespec_get(&us.tsNow, TIME_UTC); timespec_get(&us.tsNow, TIME_UTC);
us.dt = UDPC_ts_diff_to_seconds(&us.tsNow, &ctx->lastUpdated); us.dt = UDPC_INTERNAL_ts_diff(&us.tsNow, &ctx->lastUpdated);
ctx->lastUpdated = us.tsNow; ctx->lastUpdated = us.tsNow;
us.removedQueue = UDPC_Deque_init(4 * (ctx->conMap->size)); us.removedQueue = UDPC_Deque_init(4 * (ctx->conMap->size));
@ -301,7 +390,7 @@ void UDPC_update(UDPC_Context *ctx)
ntohs(receivedData.sin_port)); ntohs(receivedData.sin_port));
UDPC_INTERNAL_ConnectionData newCD = { UDPC_INTERNAL_ConnectionData newCD = {
1, 1,
0, (ctx->flags & 0x2) != 0 ? conID : UDPC_INTERNAL_generate_id(ctx),
0, 0,
0, 0,
0xFFFFFFFF, 0xFFFFFFFF,
@ -317,9 +406,13 @@ void UDPC_update(UDPC_Context *ctx)
{0, 0}, {0, 0},
0.0f 0.0f
}; };
timespec_get(&newCD.received, TIME_UTC); newCD.received = us.tsNow;
timespec_get(&newCD.sent, TIME_UTC); newCD.sent = us.tsNow;
UDPC_HashMap_insert(ctx->conMap, newCD.addr, &newCD); UDPC_HashMap_insert(ctx->conMap, newCD.addr, &newCD);
if(ctx->callbackConnected)
{
ctx->callbackConnected(ctx->callbackConnectedUserData, receivedData.sin_addr.s_addr);
}
} }
return; return;
} }
@ -418,7 +511,23 @@ void UDPC_update(UDPC_Context *ctx)
UDPC_INTERNAL_atostr(ctx, cd->addr)); UDPC_INTERNAL_atostr(ctx, cd->addr));
} }
// TODO received packet callback here if(bytes > 20)
{
UDPC_INTERNAL_PacketInfo receivedInfo;
receivedInfo.addr = receivedData.sin_addr.s_addr;
receivedInfo.id = conID;
receivedInfo.flags = (isNotRecvCheck != 0 ? 0 : 0x2) | (isResent != 0 ? 0x5 : 0);
receivedInfo.data = malloc(bytes - 20);
memcpy(receivedInfo.data, ctx->recvBuf + 20, bytes - 20);
receivedInfo.size = bytes - 20;
receivedInfo.sent = us.tsNow;
UDPC_Deque_push_back(ctx->receivedPackets, &receivedInfo, sizeof(UDPC_INTERNAL_PacketInfo));
}
else
{
UDPC_INTERNAL_log(ctx, 3, "Packet has no payload, not adding to received queue");
}
} }
void UDPC_INTERNAL_update_to_rtt_si(void *userData, uint32_t addr, char *data) void UDPC_INTERNAL_update_to_rtt_si(void *userData, uint32_t addr, char *data)
@ -428,12 +537,16 @@ void UDPC_INTERNAL_update_to_rtt_si(void *userData, uint32_t addr, char *data)
UDPC_INTERNAL_ConnectionData *cd = (UDPC_INTERNAL_ConnectionData*)data; UDPC_INTERNAL_ConnectionData *cd = (UDPC_INTERNAL_ConnectionData*)data;
// check for timed out connection // check for timed out connection
if(UDPC_ts_diff_to_seconds(&us->tsNow, &cd->received) >= UDPC_TIMEOUT_SECONDS) if(UDPC_INTERNAL_ts_diff(&us->tsNow, &cd->received) >= UDPC_TIMEOUT_SECONDS)
{ {
UDPC_Deque_push_back(us->removedQueue, &addr, 4); UDPC_Deque_push_back(us->removedQueue, &addr, 4);
UDPC_INTERNAL_log(us->ctx, 2, "Connection timed out with addr %s port %d", UDPC_INTERNAL_log(us->ctx, 2, "Connection timed out with addr %s port %d",
UDPC_INTERNAL_atostr(us->ctx, addr), UDPC_INTERNAL_atostr(us->ctx, addr),
cd->port); cd->port);
if(us->ctx->callbackDisconnected)
{
us->ctx->callbackDisconnected(us->ctx->callbackDisconnectedUserData, addr);
}
return; return;
} }
@ -513,7 +626,7 @@ void UDPC_INTERNAL_update_send(void *userData, uint32_t addr, char *data)
if(cd->sendPktQueue->size == 0) if(cd->sendPktQueue->size == 0)
{ {
// send packet queue is empty, send heartbeat packet // send packet queue is empty, send heartbeat packet
if(UDPC_ts_diff_to_seconds(&us->tsNow, &cd->sent) < UDPC_HEARTBEAT_PKT_INTERVAL) if(UDPC_INTERNAL_ts_diff(&us->tsNow, &cd->sent) < UDPC_HEARTBEAT_PKT_INTERVAL)
{ {
return; return;
} }
@ -653,7 +766,7 @@ void UDPC_INTERNAL_update_rtt(
UDPC_INTERNAL_PacketInfo *pinfo = UDPC_Deque_index_ptr(cd->sentPkts, sizeof(UDPC_INTERNAL_PacketInfo), x); UDPC_INTERNAL_PacketInfo *pinfo = UDPC_Deque_index_ptr(cd->sentPkts, sizeof(UDPC_INTERNAL_PacketInfo), x);
if(pinfo->id == rseq) if(pinfo->id == rseq)
{ {
float diff = UDPC_ts_diff_to_seconds(tsNow, &pinfo->sent); float diff = UDPC_INTERNAL_ts_diff(tsNow, &pinfo->sent);
if(diff > cd->rtt) if(diff > cd->rtt)
{ {
cd->rtt += (diff - cd->rtt) / 10.0f; cd->rtt += (diff - cd->rtt) / 10.0f;
@ -705,7 +818,7 @@ void UDPC_INTERNAL_check_pkt_timeout(
// is not received checked or already resent // is not received checked or already resent
break; break;
} }
float seconds = UDPC_ts_diff_to_seconds(tsNow, &pinfo->sent); float seconds = UDPC_INTERNAL_ts_diff(tsNow, &pinfo->sent);
if(seconds >= UDPC_PACKET_TIMEOUT_SEC) if(seconds >= UDPC_PACKET_TIMEOUT_SEC)
{ {
if(pinfo->size <= 20) if(pinfo->size <= 20)
@ -747,7 +860,7 @@ void UDPC_INTERNAL_check_pkt_timeout(
} }
} }
float UDPC_ts_diff_to_seconds(struct timespec *ts0, struct timespec *ts1) float UDPC_INTERNAL_ts_diff(struct timespec *ts0, struct timespec *ts1)
{ {
float sec = 0.0f; float sec = 0.0f;
if(!ts0 || !ts1) if(!ts0 || !ts1)
@ -814,6 +927,7 @@ int UDPC_INTERNAL_threadfn(void *context)
} }
mtx_lock(&ctx->tCVMtx); mtx_lock(&ctx->tCVMtx);
cnd_timedwait(&ctx->threadCV, &ctx->tCVMtx, &ts); cnd_timedwait(&ctx->threadCV, &ctx->tCVMtx, &ts);
UDPC_update(ctx);
mtx_unlock(&ctx->tCVMtx); mtx_unlock(&ctx->tCVMtx);
mtx_lock(&ctx->tflagsMtx); mtx_lock(&ctx->tflagsMtx);
@ -916,3 +1030,25 @@ char* UDPC_INTERNAL_atostr(UDPC_Context *ctx, uint32_t addr)
return ctx->atostrBuf; return ctx->atostrBuf;
} }
uint32_t UDPC_INTERNAL_generate_id(UDPC_Context *ctx)
{
uint32_t newID = 0x10000000;
while(newID == 0x10000000)
{
newID = rand() % 0x10000000;
UDPC_HashMap_itercall(ctx->conMap, UDPC_INTERNAL_check_ids, &newID);
}
return newID;
}
void UDPC_INTERNAL_check_ids(void *userData, uint32_t addr, char *data)
{
UDPC_INTERNAL_ConnectionData *cd = (UDPC_INTERNAL_ConnectionData*)data;
if(cd->id == *((uint32_t*)userData))
{
*((uint32_t*)userData) = 0x10000000;
}
}

View file

@ -27,6 +27,27 @@
#define UDPC_ATOSTR_BUF_SIZE 16 #define UDPC_ATOSTR_BUF_SIZE 16
/// (void *userData, uint32_t address)
/*!
* Note address is in network byte order (usually big-endian)
*/
typedef void (*UDPC_callback_connected)(void*, uint32_t);
/// (void *userData, uint32_t address)
/*!
* Note address is in network byte order (usually big-endian)
*/
typedef void (*UDPC_callback_disconnected)(void*, uint32_t);
/// (void *userData, char *packetData, uint32_t packetSize)
/*!
* The data pointed to by the packetData argument is to data internally managed
* by the UDPC_Context. It will change every time this callback is called so do
* not depend on it persisting. This means you should copy the data out of it
* when the callback is invoked and work with the copied data.
*/
typedef void (*UDPC_callback_received)(void*, char*, uint32_t);
/// This struct should not be used outside of this library /// This struct should not be used outside of this library
typedef struct { typedef struct {
uint32_t addr; // in network order (big-endian) uint32_t addr; // in network order (big-endian)
@ -37,7 +58,7 @@ typedef struct {
* 0x4 - has been re-sent * 0x4 - has been re-sent
*/ */
uint32_t flags; uint32_t flags;
char *data; // no-header in sendPktQueue, header in sentPkts char *data; // no-header in sendPktQueue and receivedPackets, header in sentPkts
uint32_t size; uint32_t size;
struct timespec sent; struct timespec sent;
} UDPC_INTERNAL_PacketInfo; } UDPC_INTERNAL_PacketInfo;
@ -94,6 +115,14 @@ typedef struct {
struct timespec lastUpdated; struct timespec lastUpdated;
char atostrBuf[UDPC_ATOSTR_BUF_SIZE]; char atostrBuf[UDPC_ATOSTR_BUF_SIZE];
char recvBuf[UDPC_PACKET_MAX_SIZE]; char recvBuf[UDPC_PACKET_MAX_SIZE];
UDPC_Deque *receivedPackets;
UDPC_callback_connected callbackConnected;
void *callbackConnectedUserData;
UDPC_callback_disconnected callbackDisconnected;
void *callbackDisconnectedUserData;
UDPC_callback_received callbackReceived;
void *callbackReceivedUserData;
} UDPC_Context; } UDPC_Context;
typedef struct { typedef struct {
@ -111,6 +140,27 @@ void UDPC_destroy(UDPC_Context *ctx);
void UDPC_INTERNAL_destroy_conMap(void *unused, uint32_t addr, char *data); void UDPC_INTERNAL_destroy_conMap(void *unused, uint32_t addr, char *data);
void UDPC_set_callback_connected(
UDPC_Context *ctx, UDPC_callback_connected fptr, void *userData);
void UDPC_set_callback_disconnected(
UDPC_Context *ctx, UDPC_callback_disconnected fptr, void *userData);
void UDPC_set_callback_received(
UDPC_Context *ctx, UDPC_callback_received fptr, void *userData);
void UDPC_check_received(UDPC_Context *ctx);
/*!
* \brief Queues a packet to send to a connected peer
* Note addr is expected to be in network-byte-order (big-endian).
* If isChecked is non-zero, UDPC will attempt to resend the packet if peer has
* not received it within UDPC_PACKET_TIMEOUT_SEC seconds.
* \return non-zero on success
*/
int UDPC_queue_send(
UDPC_Context *ctx, uint32_t addr, uint32_t isChecked, void *data, uint32_t size);
uint32_t UDPC_get_error(UDPC_Context *ctx); uint32_t UDPC_get_error(UDPC_Context *ctx);
const char* UDPC_get_error_str(uint32_t error); const char* UDPC_get_error_str(uint32_t error);
@ -146,9 +196,9 @@ void UDPC_INTERNAL_check_pkt_timeout(
uint32_t ack, uint32_t ack,
struct timespec *tsNow); struct timespec *tsNow);
float UDPC_ts_diff_to_seconds(struct timespec *ts0, struct timespec *ts1); float UDPC_INTERNAL_ts_diff(struct timespec *ts0, struct timespec *ts1);
int UDPC_INTERNAL_threadfn(void *context); // internal usage only int UDPC_INTERNAL_threadfn(void *context);
/* /*
* 0x1 - is ping * 0x1 - is ping
@ -173,4 +223,8 @@ void UDPC_INTERNAL_log(UDPC_Context *ctx, uint32_t level, const char *msg, ...);
char* UDPC_INTERNAL_atostr(UDPC_Context *ctx, uint32_t addr); char* UDPC_INTERNAL_atostr(UDPC_Context *ctx, uint32_t addr);
uint32_t UDPC_INTERNAL_generate_id(UDPC_Context *ctx);
void UDPC_INTERNAL_check_ids(void *userData, uint32_t addr, char *data);
#endif #endif