#define UDPC_CONTEXT_IDENTIFIER 0x902F4DB3
#define UDPC_SENT_PKTS_MAX_SIZE 33
-#define UDPC_QUEUED_PKTS_MAX_SIZE 32
+#define UDPC_QUEUED_PKTS_MAX_SIZE 64
#define UDPC_RECEIVED_PKTS_MAX_SIZE 64
#define UDPC_ID_CONNECT 0x80000000
uint32_t scope_id;
uint16_t port; // in native order
std::deque<UDPC_PacketInfo> sentPkts;
- TSQueue<UDPC_PacketInfo> sendPkts;
- TSQueue<UDPC_PacketInfo> priorityPkts;
+ std::deque<UDPC_PacketInfo> sendPkts;
+ std::deque<UDPC_PacketInfo> priorityPkts;
// pkt id to pkt shared_ptr
std::unordered_map<uint32_t, SentPktInfo::Ptr> sentInfoMap;
std::chrono::steady_clock::time_point received;
// id to ipv6 address and port (as UDPC_ConnectionId)
std::unordered_map<uint32_t, UDPC_ConnectionId> idMap;
TSQueue<UDPC_PacketInfo> receivedPkts;
+ TSQueue<UDPC_PacketInfo> cSendPkts;
std::default_random_engine rng_engine;
addr({0}),
port(0),
sentPkts(),
-sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
-priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
+sendPkts(),
+priorityPkts(),
received(std::chrono::steady_clock::now()),
sent(std::chrono::steady_clock::now()),
rtt(std::chrono::steady_clock::duration::zero())
scope_id(scope_id),
port(port),
sentPkts(),
-sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
-priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
+sendPkts(),
+priorityPkts(),
received(std::chrono::steady_clock::now()),
sent(std::chrono::steady_clock::now()),
rtt(std::chrono::steady_clock::duration::zero())
#endif
atostrBufIndex(0),
receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
+cSendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
rng_engine(),
mutex()
{
}
}
+ // move queued in cSendPkts to existing connection's sendPkts
+ {
+ unsigned int rsize = 0;
+ do {
+ auto next = cSendPkts.top_and_pop_and_rsize(&rsize);
+ if(next) {
+ if(auto iter = conMap.find(next.value().receiver);
+ iter != conMap.end()) {
+ iter->second.sendPkts.push_back(next.value());
+ } else {
+ UDPC_CHECK_LOG(this,
+ UDPC_LoggingType::UDPC_WARNING,
+ "Dropped queued packet to ",
+ UDPC_atostr(
+ (UDPC_HContext)this,
+ next.value().receiver.addr),
+ ", port = ",
+ next.value().receiver.port,
+ " due to connection not existing");
+ }
+ }
+ } while(rsize != 0);
+ }
+
// update send (only if triggerSend flag is set)
for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
if(!iter->second.flags.test(0)) {
UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
bool isResending = false;
if(!iter->second.priorityPkts.empty()) {
- // TODO verify getting struct copy is valid
- pInfo = std::move(iter->second.priorityPkts.top().value());
- iter->second.priorityPkts.pop();
+ pInfo = iter->second.priorityPkts.front();
+ iter->second.priorityPkts.pop_front();
isResending = true;
} else {
- pInfo = std::move(iter->second.sendPkts.top().value());
- iter->second.sendPkts.pop();
+ pInfo = iter->second.sendPkts.front();
+ iter->second.sendPkts.pop_front();
}
std::unique_ptr<char[]> buf = std::make_unique<char[]>(UDPC_FULL_HEADER_SIZE + pInfo.dataSize);
UDPC::preparePacket(
resendingData.dataSize = sentIter->dataSize - UDPC_FULL_HEADER_SIZE;
std::memcpy(resendingData.data, sentIter->data + UDPC_FULL_HEADER_SIZE, resendingData.dataSize);
resendingData.flags = 0;
- iter->second.priorityPkts.push(resendingData);
+ iter->second.priorityPkts.push_back(resendingData);
}
break;
}
}
}
-int UDPC_get_queue_send_available(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
+int UDPC_get_queue_send_available(UDPC_HContext ctx) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return 0;
}
- std::lock_guard<std::mutex> lock(c->mutex);
-
- auto iter = c->conMap.find(connectionId);
- if(iter != c->conMap.end()) {
- return iter->second.sendPkts.capacity() - iter->second.sendPkts.size();
- } else {
- return 0;
- }
+ return c->cSendPkts.remaining_capacity();
}
void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return;
- }
-
- std::lock_guard<std::mutex> lock(c->mutex);
-
- auto iter = c->conMap.find(destinationId);
- if(iter == c->conMap.end()) {
+ } else if(c->cSendPkts.full()) {
UDPC_CHECK_LOG(c,
UDPC_LoggingType::UDPC_ERROR,
- "Failed to add packet to queue, no established connection "
- "with recipient");
+ "Failed to queue packet to ",
+ UDPC_atostr(ctx, destinationId.addr),
+ ", port = ",
+ destinationId.port,
+ " because queue is full");
return;
}
sendInfo.sender.addr = in6addr_loopback;
sendInfo.sender.port = ntohs(c->socketInfo.sin6_port);
sendInfo.receiver.addr = destinationId.addr;
- sendInfo.receiver.port = iter->second.port;
+ sendInfo.receiver.port = destinationId.port;
sendInfo.flags = (isChecked != 0 ? 0x0 : 0x4);
- iter->second.sendPkts.push(sendInfo);
+ c->cSendPkts.push(sendInfo);
}
int UDPC_set_accept_new_connections(UDPC_HContext ctx, int isAccepting) {
if(!c) {
return 0;
}
- std::lock_guard<std::mutex> lock(c->mutex);
return c->isAcceptNewConnections.exchange(isAccepting == 0 ? false : true);
}