Rework sendPkts (sending queue)

Changed sendPkts in ConnectionData to std::deque, added a TSQueue
cSendPkts to Context. Queued packets will be moved to corresponding
ConnectionData instances during update, dropping with a warning those
where a connection does not exist.

Minor other fixes.

Some additions to TSQueue.
This commit is contained in:
Stephen Seo 2019-09-27 20:19:48 +09:00
parent 98e88237ce
commit b11d87ca12
5 changed files with 69 additions and 40 deletions

View file

@ -37,7 +37,9 @@ class TSQueue {
void changeCapacity(unsigned int newCapacity, unsigned int *status);
unsigned int size();
unsigned int capacity();
unsigned int remaining_capacity();
bool empty();
bool full();
private:
std::mutex mutex;
@ -165,6 +167,13 @@ unsigned int TSQueue<T>::capacity() {
return capacity;
}
template <typename T>
unsigned int TSQueue<T>::remaining_capacity() {
std::lock_guard<std::mutex> lock(mutex);
unsigned int remaining = rb.getCapacity() - rb.getSize();
return remaining;
}
template <typename T>
bool TSQueue<T>::empty() {
// No lock required, since this is calling size() that uses a lock
@ -172,4 +181,11 @@ bool TSQueue<T>::empty() {
return size == 0;
}
template <typename T>
bool TSQueue<T>::full() {
// No lock required, calling remaining_capacity() that uses a lock
unsigned int remaining = remaining_capacity();
return remaining == 0;
}
#endif

View file

@ -4,7 +4,7 @@
#define UDPC_CONTEXT_IDENTIFIER 0x902F4DB3
#define UDPC_SENT_PKTS_MAX_SIZE 33
#define UDPC_QUEUED_PKTS_MAX_SIZE 32
#define UDPC_QUEUED_PKTS_MAX_SIZE 64
#define UDPC_RECEIVED_PKTS_MAX_SIZE 64
#define UDPC_ID_CONNECT 0x80000000
@ -110,8 +110,8 @@ struct ConnectionData {
uint32_t scope_id;
uint16_t port; // in native order
std::deque<UDPC_PacketInfo> sentPkts;
TSQueue<UDPC_PacketInfo> sendPkts;
TSQueue<UDPC_PacketInfo> priorityPkts;
std::deque<UDPC_PacketInfo> sendPkts;
std::deque<UDPC_PacketInfo> priorityPkts;
// pkt id to pkt shared_ptr
std::unordered_map<uint32_t, SentPktInfo::Ptr> sentInfoMap;
std::chrono::steady_clock::time_point received;
@ -266,6 +266,7 @@ public:
// id to ipv6 address and port (as UDPC_ConnectionId)
std::unordered_map<uint32_t, UDPC_ConnectionId> idMap;
TSQueue<UDPC_PacketInfo> receivedPkts;
TSQueue<UDPC_PacketInfo> cSendPkts;
std::default_random_engine rng_engine;

View file

@ -89,8 +89,8 @@ toggledTimer(std::chrono::steady_clock::duration::zero()),
addr({0}),
port(0),
sentPkts(),
sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
sendPkts(),
priorityPkts(),
received(std::chrono::steady_clock::now()),
sent(std::chrono::steady_clock::now()),
rtt(std::chrono::steady_clock::duration::zero())
@ -125,8 +125,8 @@ addr(addr),
scope_id(scope_id),
port(port),
sentPkts(),
sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
sendPkts(),
priorityPkts(),
received(std::chrono::steady_clock::now()),
sent(std::chrono::steady_clock::now()),
rtt(std::chrono::steady_clock::duration::zero())
@ -171,6 +171,7 @@ loggingType(UDPC_WARNING),
#endif
atostrBufIndex(0),
receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
cSendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
rng_engine(),
mutex()
{
@ -312,6 +313,30 @@ void UDPC::Context::update_impl() {
}
}
// move queued in cSendPkts to existing connection's sendPkts
{
unsigned int rsize = 0;
do {
auto next = cSendPkts.top_and_pop_and_rsize(&rsize);
if(next) {
if(auto iter = conMap.find(next.value().receiver);
iter != conMap.end()) {
iter->second.sendPkts.push_back(next.value());
} else {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_WARNING,
"Dropped queued packet to ",
UDPC_atostr(
(UDPC_HContext)this,
next.value().receiver.addr),
", port = ",
next.value().receiver.port,
" due to connection not existing");
}
}
} while(rsize != 0);
}
// update send (only if triggerSend flag is set)
for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
if(!iter->second.flags.test(0)) {
@ -479,13 +504,12 @@ void UDPC::Context::update_impl() {
UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
bool isResending = false;
if(!iter->second.priorityPkts.empty()) {
// TODO verify getting struct copy is valid
pInfo = std::move(iter->second.priorityPkts.top().value());
iter->second.priorityPkts.pop();
pInfo = iter->second.priorityPkts.front();
iter->second.priorityPkts.pop_front();
isResending = true;
} else {
pInfo = std::move(iter->second.sendPkts.top().value());
iter->second.sendPkts.pop();
pInfo = iter->second.sendPkts.front();
iter->second.sendPkts.pop_front();
}
std::unique_ptr<char[]> buf = std::make_unique<char[]>(UDPC_FULL_HEADER_SIZE + pInfo.dataSize);
UDPC::preparePacket(
@ -810,7 +834,7 @@ void UDPC::Context::update_impl() {
resendingData.dataSize = sentIter->dataSize - UDPC_FULL_HEADER_SIZE;
std::memcpy(resendingData.data, sentIter->data + UDPC_FULL_HEADER_SIZE, resendingData.dataSize);
resendingData.flags = 0;
iter->second.priorityPkts.push(resendingData);
iter->second.priorityPkts.push_back(resendingData);
}
break;
}
@ -1210,20 +1234,13 @@ void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connec
}
}
int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
int UDPC_get_queue_send_available(UDPC_HContext ctx) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return 0;
}
std::lock_guard<std::mutex> lock(c->mutex);
auto iter = c->conMap.find(connectionId);
if(iter != c->conMap.end()) {
return iter->second.sendPkts.capacity() - iter->second.sendPkts.size();
} else {
return 0;
}
return c->cSendPkts.remaining_capacity();
}
void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
@ -1235,16 +1252,14 @@ void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return;
}
std::lock_guard<std::mutex> lock(c->mutex);
auto iter = c->conMap.find(destinationId);
if(iter == c->conMap.end()) {
} else if(c->cSendPkts.full()) {
UDPC_CHECK_LOG(c,
UDPC_LoggingType::UDPC_ERROR,
"Failed to add packet to queue, no established connection "
"with recipient");
"Failed to queue packet to ",
UDPC_atostr(ctx, destinationId.addr),
", port = ",
destinationId.port,
" because queue is full");
return;
}
@ -1254,10 +1269,10 @@ void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
sendInfo.sender.addr = in6addr_loopback;
sendInfo.sender.port = ntohs(c->socketInfo.sin6_port);
sendInfo.receiver.addr = destinationId.addr;
sendInfo.receiver.port = iter->second.port;
sendInfo.receiver.port = destinationId.port;
sendInfo.flags = (isChecked != 0 ? 0x0 : 0x4);
iter->second.sendPkts.push(sendInfo);
c->cSendPkts.push(sendInfo);
}
int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting) {
@ -1265,7 +1280,6 @@ int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting) {
if(!c) {
return 0;
}
std::lock_guard<std::mutex> lock(c->mutex);
return c->isAcceptNewConnections.exchange(isAccepting == 0 ? false : true);
}

View file

@ -105,7 +105,7 @@ void UDPC_update(UDPC_HContext ctx);
void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectionId);
int UDPC_get_queue_send_available(UDPC_HContext ctx);
void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
int isChecked, void *data, uint32_t size);

View file

@ -126,12 +126,10 @@ int main(int argc, char **argv) {
} else if(sendIds.size() > temp) {
sendIds.resize(temp);
}
for(unsigned int i = 0; i < temp; ++i) {
temp2 = UDPC_get_queue_send_available(context, list[i]);
for(unsigned int j = 0; j < temp2; ++j) {
temp3 = htonl(sendIds[i]++);
UDPC_queue_send(context, list[i], 0, &temp3, sizeof(unsigned int));
}
temp2 = UDPC_get_queue_send_available(context);
for(unsigned int i = 0; i < temp2; ++i) {
temp3 = htonl(sendIds[i % temp]++);
UDPC_queue_send(context, list[i % temp], 0, &temp3, sizeof(unsigned int));
}
UDPC_free_list_connected(list);
}