diff --git a/cpp_impl/src/TSQueue.hpp b/cpp_impl/src/TSQueue.hpp index 81dadf5..714788f 100644 --- a/cpp_impl/src/TSQueue.hpp +++ b/cpp_impl/src/TSQueue.hpp @@ -28,7 +28,12 @@ class TSQueue { bool pop(); std::optional top_and_pop(); void clear(); - void changeCapacity(unsigned int newCapacity); + /* + * status == + * 0 - success + * 1 - success, but previous size was reduced + */ + void changeCapacity(unsigned int newCapacity, unsigned int *status); unsigned int size(); unsigned int capacity(); bool empty(); @@ -119,8 +124,15 @@ void TSQueue::clear() { } template -void TSQueue::changeCapacity(unsigned int newCapacity) { +void TSQueue::changeCapacity(unsigned int newCapacity, unsigned int *status) { std::lock_guard lock(mutex); + if(status) { + if(rb.getSize() < newCapacity) { + *status = 1; + } else { + *status = 0; + } + } rb.changeCapacity(newCapacity); } diff --git a/cpp_impl/src/UDPC_Defines.hpp b/cpp_impl/src/UDPC_Defines.hpp index 6eefe09..778cde7 100644 --- a/cpp_impl/src/UDPC_Defines.hpp +++ b/cpp_impl/src/UDPC_Defines.hpp @@ -5,7 +5,7 @@ #define UDPC_CONTEXT_IDENTIFIER 0x902F4DB3 #define UDPC_SENT_PKTS_MAX_SIZE 33 #define UDPC_QUEUED_PKTS_MAX_SIZE 32 -#define UDPC_RECEIVED_PKTS_MAX_SIZE 50 +#define UDPC_RECEIVED_PKTS_MAX_SIZE 64 #define UDPC_ID_CONNECT 0x80000000 #define UDPC_ID_PING 0x40000000 @@ -103,7 +103,6 @@ struct ConnectionData { std::deque sentPkts; TSQueue sendPkts; TSQueue priorityPkts; - TSQueue receivedPkts; // pkt id to pkt shared_ptr std::unordered_map sentInfoMap; std::chrono::steady_clock::time_point received; @@ -253,6 +252,7 @@ public: std::unordered_map, IPV6_Hasher> addrConMap; // id to ipv6 address and port (as UDPC_ConnectionId) std::unordered_map idMap; + TSQueue receivedPkts; std::default_random_engine rng_engine; diff --git a/cpp_impl/src/UDPConnection.cpp b/cpp_impl/src/UDPConnection.cpp index c5c8e63..878d792 100644 --- a/cpp_impl/src/UDPConnection.cpp +++ b/cpp_impl/src/UDPConnection.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #if UDPC_PLATFORM == UDPC_PLATFORM_WINDOWS #include @@ -90,7 +91,6 @@ port(0), sentPkts(), sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE), priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE), -receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE), received(std::chrono::steady_clock::now()), sent(std::chrono::steady_clock::now()), rtt(std::chrono::steady_clock::duration::zero()) @@ -120,7 +120,6 @@ port(port), sentPkts(), sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE), priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE), -receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE), received(std::chrono::steady_clock::now()), sent(std::chrono::steady_clock::now()), rtt(std::chrono::steady_clock::duration::zero()) @@ -157,9 +156,14 @@ loggingType(UDPC_INFO), loggingType(UDPC_WARNING), #endif atostrBufIndex(0), +receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE), rng_engine(), mutex() { + for(unsigned int i = 0; i < UDPC_ATOSTR_SIZE; ++i) { + atostrBuf[i] = 0; + } + if(isThreaded) { flags.set(0); } else { @@ -389,7 +393,10 @@ void UDPC::Context::update_impl() { UDPC_IPV6_SOCKADDR_TYPE destinationInfo; destinationInfo.sin6_family = AF_INET6; - std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16); + std::memcpy( + UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), + UDPC_IPV6_ADDR_SUB(iter->first.addr), + 16); destinationInfo.sin6_port = htons(iter->second.port); destinationInfo.sin6_flowinfo = 0; destinationInfo.sin6_scope_id = iter->first.scope_id; @@ -411,6 +418,7 @@ void UDPC::Context::update_impl() { } UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo(); + pInfo.flags = 0x4; pInfo.sender.addr = in6addr_loopback; pInfo.receiver.addr = iter->first.addr; pInfo.sender.port = socketInfo.sin6_port; @@ -450,8 +458,13 @@ void UDPC::Context::update_impl() { UDPC_IPV6_SOCKADDR_TYPE destinationInfo; destinationInfo.sin6_family = AF_INET6; - std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16); + std::memcpy( + UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), + UDPC_IPV6_ADDR_SUB(iter->first.addr), + 16); destinationInfo.sin6_port = htons(iter->second.port); + destinationInfo.sin6_flowinfo = 0; + destinationInfo.sin6_scope_id = iter->first.scope_id; long int sentBytes = sendto( socketHandle, buf.get(), @@ -480,7 +493,7 @@ void UDPC::Context::update_impl() { sentPInfo.sender.port = socketInfo.sin6_port; sentPInfo.receiver.port = iter->second.port; - iter->second.sentPkts.push_back(std::move(pInfo)); + iter->second.sentPkts.push_back(std::move(sentPInfo)); iter->second.cleanupSentPkts(); } else { // is not check-received, only id stored in data array @@ -491,9 +504,9 @@ void UDPC::Context::update_impl() { sentPInfo.receiver.addr = iter->first.addr; sentPInfo.sender.port = socketInfo.sin6_port; sentPInfo.receiver.port = iter->second.port; - *((uint32_t*)(sentPInfo.data + 8)) = iter->second.lseq - 1; + *((uint32_t*)(sentPInfo.data + 8)) = htonl(iter->second.lseq - 1); - iter->second.sentPkts.push_back(std::move(pInfo)); + iter->second.sentPkts.push_back(std::move(sentPInfo)); iter->second.cleanupSentPkts(); } @@ -502,6 +515,7 @@ void UDPC::Context::update_impl() { sentPktInfo->id = iter->second.lseq - 1; iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo)); } + iter->second.sent = now; } // receive packet @@ -785,14 +799,13 @@ void UDPC::Context::update_impl() { recPktInfo.sender.port = ntohs(receivedData.sin6_port); recPktInfo.receiver.port = ntohs(socketInfo.sin6_port); - if(iter->second.receivedPkts.size() == iter->second.receivedPkts.capacity()) { + if(!receivedPkts.push(recPktInfo)) { log( UDPC_LoggingType::UDPC_WARNING, "receivedPkts is full, removing oldest entry to make room"); - iter->second.receivedPkts.pop(); + receivedPkts.pop(); + receivedPkts.push(recPktInfo); } - - iter->second.receivedPkts.push(recPktInfo); } else if(bytes == 20) { log( UDPC_LoggingType::UDPC_VERBOSE, @@ -903,7 +916,7 @@ void UDPC::threadedUpdate(Context *ctx) { ctx->update_impl(); ctx->mutex.unlock(); nextNow = std::chrono::steady_clock::now(); - std::this_thread::sleep_for(std::chrono::milliseconds(33) - (nextNow - now)); + std::this_thread::sleep_for(std::chrono::milliseconds(11) - (nextNow - now)); } } @@ -1095,7 +1108,7 @@ int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectio } void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId, - uint32_t isChecked, void *data, uint32_t size) { + int isChecked, void *data, uint32_t size) { if(size == 0 || !data) { return; } @@ -1123,7 +1136,7 @@ void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId, sendInfo.sender.port = c->socketInfo.sin6_port; sendInfo.receiver.addr = destinationId.addr; sendInfo.receiver.port = iter->second.port; - sendInfo.flags = (isChecked ? 0x0 : 0x4); + sendInfo.flags = (isChecked != 0 ? 0x0 : 0x4); iter->second.sendPkts.push(sendInfo); } @@ -1193,6 +1206,36 @@ int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId) { return c->conMap.find(connectionId) == c->conMap.end() ? 0 : 1; } +UDPC_ConnectionId* UDPC_get_list_connected(UDPC_HContext ctx, unsigned int *size) { + UDPC::Context *c = UDPC::verifyContext(ctx); + if(!c) { + return nullptr; + } + + std::lock_guard lock(c->mutex); + + if(c->conMap.empty()) { + return nullptr; + } + if(size) { + *size = c->conMap.size(); + } + + UDPC_ConnectionId *list = (UDPC_ConnectionId*)std::malloc( + sizeof(UDPC_ConnectionId) * (c->conMap.size() + 1)); + UDPC_ConnectionId *current = list; + for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) { + *current = iter->first; + ++current; + } + *current = UDPC_ConnectionId{{0}, 0, 0}; + return list; +} + +void UDPC_free_list_connected(UDPC_ConnectionId *list) { + std::free(list); +} + uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id) { UDPC::Context *c = UDPC::verifyContext(ctx); if(!c) { @@ -1210,18 +1253,41 @@ UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggi return static_cast(c->loggingType.exchange(loggingType)); } -UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx) { +UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned int *remaining) { UDPC::Context *c = UDPC::verifyContext(ctx); if(!c) { return UDPC::get_empty_pinfo(); } - std::lock_guard lock(c->mutex); - - // TODO impl + auto opt_pinfo = c->receivedPkts.top_and_pop(); + if(remaining) { + *remaining = c->receivedPkts.size(); + } + if(opt_pinfo) { + return *opt_pinfo; + } return UDPC::get_empty_pinfo(); } +int UDPC_set_received_capacity(UDPC_HContext ctx, unsigned int newCapacity) { + if(newCapacity == 0) { + return 0; + } + + UDPC::Context *c = UDPC::verifyContext(ctx); + if(!c) { + return 0; + } + + unsigned int status = 0; + c->receivedPkts.changeCapacity(newCapacity, &status); + if(status == 1) { + c->log(UDPC_LoggingType::UDPC_WARNING, + "Received Queue: Previous size was truncated to new capacity"); + } + return 1; +} + const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId) { return UDPC_atostr(ctx, connectionId.addr); } @@ -1255,7 +1321,8 @@ const char *UDPC_atostr(UDPC_HContext ctx, UDPC_IPV6_ADDR_TYPE addr) { } } - if(UDPC_IPV6_ADDR_SUB(addr)[i] == 0 && (headIndex - index <= 1 || c->atostrBuf[index] == ':')) { + if(UDPC_IPV6_ADDR_SUB(addr)[i] == 0 + && (headIndex - index <= 1 || c->atostrBuf[index] == ':')) { continue; } else { std::stringstream sstream; diff --git a/cpp_impl/src/UDPConnection.h b/cpp_impl/src/UDPConnection.h index 93a068b..3d24e02 100644 --- a/cpp_impl/src/UDPConnection.h +++ b/cpp_impl/src/UDPConnection.h @@ -103,7 +103,7 @@ void UDPC_client_initiate_connection(UDPC_HContext ctx, UDPC_ConnectionId connec int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectionId); void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId, - uint32_t isChecked, void *data, uint32_t size); + int isChecked, void *data, uint32_t size); int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting); @@ -111,11 +111,17 @@ int UDPC_drop_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId, bool int UDPC_has_connection(UDPC_HContext ctx, UDPC_ConnectionId connectionId); +UDPC_ConnectionId* UDPC_get_list_connected(UDPC_HContext ctx, unsigned int *size); + +void UDPC_free_list_connected(UDPC_ConnectionId *list); + uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id); UDPC_LoggingType UDPC_set_logging_type(UDPC_HContext ctx, UDPC_LoggingType loggingType); -UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx); +UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned int *remaining); + +int UDPC_set_received_capacity(UDPC_HContext ctx, unsigned int newCapacity); const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId); diff --git a/cpp_impl/src/test/TestTSQueue.cpp b/cpp_impl/src/test/TestTSQueue.cpp index fc6dc20..9721be4 100644 --- a/cpp_impl/src/test/TestTSQueue.cpp +++ b/cpp_impl/src/test/TestTSQueue.cpp @@ -62,7 +62,7 @@ TEST(TSQueue, Usage) EXPECT_FALSE(q.push(temp)); EXPECT_EQ(q.size(), 4); - q.changeCapacity(8); + q.changeCapacity(8, nullptr); EXPECT_EQ(q.size(), 4); temp = 10; @@ -87,7 +87,7 @@ TEST(TSQueue, Usage) EXPECT_EQ(400, q.top()); - q.changeCapacity(1); + q.changeCapacity(1, nullptr); // { 10 } diff --git a/cpp_impl/src/test/UDPC_NetworkTest.cpp b/cpp_impl/src/test/UDPC_NetworkTest.cpp index ab9df14..4501dcd 100644 --- a/cpp_impl/src/test/UDPC_NetworkTest.cpp +++ b/cpp_impl/src/test/UDPC_NetworkTest.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -15,6 +16,8 @@ void usage() { puts("-lp - listen port"); puts("-cl - connection addr (client only)"); puts("-cp - connection port (client only)"); + puts("-t "); + puts("-n - do not add payload to packets"); } int main(int argc, char **argv) { @@ -30,6 +33,7 @@ int main(int argc, char **argv) { const char *connectionAddr = nullptr; const char *connectionPort = nullptr; unsigned int tickLimit = 15; + bool noPayload = false; while(argc > 0) { if(std::strcmp(argv[0], "-c") == 0) { isClient = true; @@ -51,6 +55,9 @@ int main(int argc, char **argv) { --argc; ++argv; tickLimit = std::atoi(argv[0]); printf("Set tick limit to %u\n", tickLimit); + } else if(std::strcmp(argv[0], "-n") == 0) { + noPayload = true; + puts("Disabling sending payload"); } else { printf("ERROR: invalid argument \"%s\"\n", argv[0]); usage(); @@ -101,11 +108,43 @@ int main(int argc, char **argv) { } UDPC_set_logging_type(context, UDPC_LoggingType::UDPC_INFO); unsigned int tick = 0; + unsigned int temp = 0; + unsigned int temp2, temp3; + UDPC_ConnectionId *list = nullptr; + std::vector sendIds; + UDPC_PacketInfo received; while(true) { std::this_thread::sleep_for(std::chrono::seconds(1)); if(isClient && UDPC_has_connection(context, connectionId) == 0) { UDPC_client_initiate_connection(context, connectionId); } + if(!noPayload) { + list = UDPC_get_list_connected(context, &temp); + if(list) { + if(sendIds.size() < temp) { + sendIds.resize(temp, 0); + } else if(sendIds.size() > temp) { + sendIds.resize(temp); + } + for(unsigned int i = 0; i < temp; ++i) { + temp2 = UDPC_get_queue_send_available(context, list[i]); + for(unsigned int j = 0; j < temp2; ++j) { + temp3 = htonl(sendIds[i]++); + UDPC_queue_send(context, list[i], 0, &temp3, sizeof(unsigned int)); + } + } + UDPC_free_list_connected(list); + } + do { + received = UDPC_get_received(context, &temp); + if(received.dataSize == sizeof(unsigned int)) { + if((received.flags & 0x8) != 0) { + temp2 = ntohl(*((unsigned int*)received.data)); + printf("Got out of order, data = %u\n", temp2); + } + } + } while (temp > 0); + } if(tick++ > tickLimit) { break; }