From b11d87ca1291484cf83fa81bd58f2250de0460b1 Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Fri, 27 Sep 2019 20:19:48 +0900 Subject: [PATCH] Rework sendPkts (sending queue) Changed sendPkts in ConnectionData to std::deque, added a TSQueue cSendPkts to Context. Queued packets will be moved to corresponding ConnectionData instances during update, dropping with a warning those where a connection does not exist. Minor other fixes. Some additions to TSQueue. --- cpp_impl/src/TSQueue.hpp | 16 ++++++ cpp_impl/src/UDPC_Defines.hpp | 7 +-- cpp_impl/src/UDPConnection.cpp | 74 +++++++++++++++----------- cpp_impl/src/UDPConnection.h | 2 +- cpp_impl/src/test/UDPC_NetworkTest.cpp | 10 ++-- 5 files changed, 69 insertions(+), 40 deletions(-) diff --git a/cpp_impl/src/TSQueue.hpp b/cpp_impl/src/TSQueue.hpp index 2352cf8..049916e 100644 --- a/cpp_impl/src/TSQueue.hpp +++ b/cpp_impl/src/TSQueue.hpp @@ -37,7 +37,9 @@ class TSQueue { void changeCapacity(unsigned int newCapacity, unsigned int *status); unsigned int size(); unsigned int capacity(); + unsigned int remaining_capacity(); bool empty(); + bool full(); private: std::mutex mutex; @@ -165,6 +167,13 @@ unsigned int TSQueue::capacity() { return capacity; } +template +unsigned int TSQueue::remaining_capacity() { + std::lock_guard lock(mutex); + unsigned int remaining = rb.getCapacity() - rb.getSize(); + return remaining; +} + template bool TSQueue::empty() { // No lock required, since this is calling size() that uses a lock @@ -172,4 +181,11 @@ bool TSQueue::empty() { return size == 0; } +template +bool TSQueue::full() { + // No lock required, calling remaining_capacity() that uses a lock + unsigned int remaining = remaining_capacity(); + return remaining == 0; +} + #endif diff --git a/cpp_impl/src/UDPC_Defines.hpp b/cpp_impl/src/UDPC_Defines.hpp index 50a1170..25bf295 100644 --- a/cpp_impl/src/UDPC_Defines.hpp +++ b/cpp_impl/src/UDPC_Defines.hpp @@ -4,7 +4,7 @@ #define UDPC_CONTEXT_IDENTIFIER 0x902F4DB3 #define UDPC_SENT_PKTS_MAX_SIZE 33 -#define UDPC_QUEUED_PKTS_MAX_SIZE 32 +#define UDPC_QUEUED_PKTS_MAX_SIZE 64 #define UDPC_RECEIVED_PKTS_MAX_SIZE 64 #define UDPC_ID_CONNECT 0x80000000 @@ -110,8 +110,8 @@ struct ConnectionData { uint32_t scope_id; uint16_t port; // in native order std::deque sentPkts; - TSQueue sendPkts; - TSQueue priorityPkts; + std::deque sendPkts; + std::deque priorityPkts; // pkt id to pkt shared_ptr std::unordered_map sentInfoMap; std::chrono::steady_clock::time_point received; @@ -266,6 +266,7 @@ public: // id to ipv6 address and port (as UDPC_ConnectionId) std::unordered_map idMap; TSQueue receivedPkts; + TSQueue cSendPkts; std::default_random_engine rng_engine; diff --git a/cpp_impl/src/UDPConnection.cpp b/cpp_impl/src/UDPConnection.cpp index 1b6ee26..aba8b35 100644 --- a/cpp_impl/src/UDPConnection.cpp +++ b/cpp_impl/src/UDPConnection.cpp @@ -89,8 +89,8 @@ toggledTimer(std::chrono::steady_clock::duration::zero()), addr({0}), port(0), sentPkts(), -sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE), -priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE), +sendPkts(), +priorityPkts(), received(std::chrono::steady_clock::now()), sent(std::chrono::steady_clock::now()), rtt(std::chrono::steady_clock::duration::zero()) @@ -125,8 +125,8 @@ addr(addr), scope_id(scope_id), port(port), sentPkts(), -sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE), -priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE), +sendPkts(), +priorityPkts(), received(std::chrono::steady_clock::now()), sent(std::chrono::steady_clock::now()), rtt(std::chrono::steady_clock::duration::zero()) @@ -171,6 +171,7 @@ loggingType(UDPC_WARNING), #endif atostrBufIndex(0), receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE), +cSendPkts(UDPC_QUEUED_PKTS_MAX_SIZE), rng_engine(), mutex() { @@ -312,6 +313,30 @@ void UDPC::Context::update_impl() { } } + // move queued in cSendPkts to existing connection's sendPkts + { + unsigned int rsize = 0; + do { + auto next = cSendPkts.top_and_pop_and_rsize(&rsize); + if(next) { + if(auto iter = conMap.find(next.value().receiver); + iter != conMap.end()) { + iter->second.sendPkts.push_back(next.value()); + } else { + UDPC_CHECK_LOG(this, + UDPC_LoggingType::UDPC_WARNING, + "Dropped queued packet to ", + UDPC_atostr( + (UDPC_HContext)this, + next.value().receiver.addr), + ", port = ", + next.value().receiver.port, + " due to connection not existing"); + } + } + } while(rsize != 0); + } + // update send (only if triggerSend flag is set) for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) { if(!iter->second.flags.test(0)) { @@ -479,13 +504,12 @@ void UDPC::Context::update_impl() { UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo(); bool isResending = false; if(!iter->second.priorityPkts.empty()) { - // TODO verify getting struct copy is valid - pInfo = std::move(iter->second.priorityPkts.top().value()); - iter->second.priorityPkts.pop(); + pInfo = iter->second.priorityPkts.front(); + iter->second.priorityPkts.pop_front(); isResending = true; } else { - pInfo = std::move(iter->second.sendPkts.top().value()); - iter->second.sendPkts.pop(); + pInfo = iter->second.sendPkts.front(); + iter->second.sendPkts.pop_front(); } std::unique_ptr buf = std::make_unique(UDPC_FULL_HEADER_SIZE + pInfo.dataSize); UDPC::preparePacket( @@ -810,7 +834,7 @@ void UDPC::Context::update_impl() { resendingData.dataSize = sentIter->dataSize - UDPC_FULL_HEADER_SIZE; std::memcpy(resendingData.data, sentIter->data + UDPC_FULL_HEADER_SIZE, resendingData.dataSize); resendingData.flags = 0; - iter->second.priorityPkts.push(resendingData); + iter->second.priorityPkts.push_back(resendingData); } break; } @@ -1210,20 +1234,13 @@ void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connec } } -int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectionId) { +int UDPC_get_queue_send_available(UDPC_HContext ctx) { UDPC::Context *c = UDPC::verifyContext(ctx); if(!c) { return 0; } - std::lock_guard lock(c->mutex); - - auto iter = c->conMap.find(connectionId); - if(iter != c->conMap.end()) { - return iter->second.sendPkts.capacity() - iter->second.sendPkts.size(); - } else { - return 0; - } + return c->cSendPkts.remaining_capacity(); } void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId, @@ -1235,16 +1252,14 @@ void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId, UDPC::Context *c = UDPC::verifyContext(ctx); if(!c) { return; - } - - std::lock_guard lock(c->mutex); - - auto iter = c->conMap.find(destinationId); - if(iter == c->conMap.end()) { + } else if(c->cSendPkts.full()) { UDPC_CHECK_LOG(c, UDPC_LoggingType::UDPC_ERROR, - "Failed to add packet to queue, no established connection " - "with recipient"); + "Failed to queue packet to ", + UDPC_atostr(ctx, destinationId.addr), + ", port = ", + destinationId.port, + " because queue is full"); return; } @@ -1254,10 +1269,10 @@ void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId, sendInfo.sender.addr = in6addr_loopback; sendInfo.sender.port = ntohs(c->socketInfo.sin6_port); sendInfo.receiver.addr = destinationId.addr; - sendInfo.receiver.port = iter->second.port; + sendInfo.receiver.port = destinationId.port; sendInfo.flags = (isChecked != 0 ? 0x0 : 0x4); - iter->second.sendPkts.push(sendInfo); + c->cSendPkts.push(sendInfo); } int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting) { @@ -1265,7 +1280,6 @@ int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting) { if(!c) { return 0; } - std::lock_guard lock(c->mutex); return c->isAcceptNewConnections.exchange(isAccepting == 0 ? false : true); } diff --git a/cpp_impl/src/UDPConnection.h b/cpp_impl/src/UDPConnection.h index d0dbedb..1783f31 100644 --- a/cpp_impl/src/UDPConnection.h +++ b/cpp_impl/src/UDPConnection.h @@ -105,7 +105,7 @@ void UDPC_update(UDPC_HContext ctx); void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId); -int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectionId); +int UDPC_get_queue_send_available(UDPC_HContext ctx); void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId, int isChecked, void *data, uint32_t size); diff --git a/cpp_impl/src/test/UDPC_NetworkTest.cpp b/cpp_impl/src/test/UDPC_NetworkTest.cpp index 4501dcd..9fa5556 100644 --- a/cpp_impl/src/test/UDPC_NetworkTest.cpp +++ b/cpp_impl/src/test/UDPC_NetworkTest.cpp @@ -126,12 +126,10 @@ int main(int argc, char **argv) { } else if(sendIds.size() > temp) { sendIds.resize(temp); } - for(unsigned int i = 0; i < temp; ++i) { - temp2 = UDPC_get_queue_send_available(context, list[i]); - for(unsigned int j = 0; j < temp2; ++j) { - temp3 = htonl(sendIds[i]++); - UDPC_queue_send(context, list[i], 0, &temp3, sizeof(unsigned int)); - } + temp2 = UDPC_get_queue_send_available(context); + for(unsigned int i = 0; i < temp2; ++i) { + temp3 = htonl(sendIds[i % temp]++); + UDPC_queue_send(context, list[i % temp], 0, &temp3, sizeof(unsigned int)); } UDPC_free_list_connected(list); }