#include <ios>
#include <iomanip>
#include <regex>
+#include <cstdlib>
#if UDPC_PLATFORM == UDPC_PLATFORM_WINDOWS
#include <netioapi.h>
sentPkts(),
sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
-receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
received(std::chrono::steady_clock::now()),
sent(std::chrono::steady_clock::now()),
rtt(std::chrono::steady_clock::duration::zero())
sentPkts(),
sendPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
priorityPkts(UDPC_QUEUED_PKTS_MAX_SIZE),
-receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
received(std::chrono::steady_clock::now()),
sent(std::chrono::steady_clock::now()),
rtt(std::chrono::steady_clock::duration::zero())
loggingType(UDPC_WARNING),
#endif
atostrBufIndex(0),
+receivedPkts(UDPC_RECEIVED_PKTS_MAX_SIZE),
rng_engine(),
mutex()
{
+ for(unsigned int i = 0; i < UDPC_ATOSTR_SIZE; ++i) {
+ atostrBuf[i] = 0;
+ }
+
if(isThreaded) {
flags.set(0);
} else {
UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6;
- std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
+ std::memcpy(
+ UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
+ UDPC_IPV6_ADDR_SUB(iter->first.addr),
+ 16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id;
}
UDPC_PacketInfo pInfo = UDPC::get_empty_pinfo();
+ pInfo.flags = 0x4;
pInfo.sender.addr = in6addr_loopback;
pInfo.receiver.addr = iter->first.addr;
pInfo.sender.port = socketInfo.sin6_port;
UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6;
- std::memcpy(UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr), UDPC_IPV6_ADDR_SUB(iter->first.addr), 16);
+ std::memcpy(
+ UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
+ UDPC_IPV6_ADDR_SUB(iter->first.addr),
+ 16);
destinationInfo.sin6_port = htons(iter->second.port);
+ destinationInfo.sin6_flowinfo = 0;
+ destinationInfo.sin6_scope_id = iter->first.scope_id;
long int sentBytes = sendto(
socketHandle,
buf.get(),
sentPInfo.sender.port = socketInfo.sin6_port;
sentPInfo.receiver.port = iter->second.port;
- iter->second.sentPkts.push_back(std::move(pInfo));
+ iter->second.sentPkts.push_back(std::move(sentPInfo));
iter->second.cleanupSentPkts();
} else {
// is not check-received, only id stored in data array
sentPInfo.receiver.addr = iter->first.addr;
sentPInfo.sender.port = socketInfo.sin6_port;
sentPInfo.receiver.port = iter->second.port;
- *((uint32_t*)(sentPInfo.data + 8)) = iter->second.lseq - 1;
+ *((uint32_t*)(sentPInfo.data + 8)) = htonl(iter->second.lseq - 1);
- iter->second.sentPkts.push_back(std::move(pInfo));
+ iter->second.sentPkts.push_back(std::move(sentPInfo));
iter->second.cleanupSentPkts();
}
sentPktInfo->id = iter->second.lseq - 1;
iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
}
+ iter->second.sent = now;
}
// receive packet
recPktInfo.sender.port = ntohs(receivedData.sin6_port);
recPktInfo.receiver.port = ntohs(socketInfo.sin6_port);
- if(iter->second.receivedPkts.size() == iter->second.receivedPkts.capacity()) {
+ if(!receivedPkts.push(recPktInfo)) {
log(
UDPC_LoggingType::UDPC_WARNING,
"receivedPkts is full, removing oldest entry to make room");
- iter->second.receivedPkts.pop();
+ receivedPkts.pop();
+ receivedPkts.push(recPktInfo);
}
-
- iter->second.receivedPkts.push(recPktInfo);
} else if(bytes == 20) {
log(
UDPC_LoggingType::UDPC_VERBOSE,
ctx->update_impl();
ctx->mutex.unlock();
nextNow = std::chrono::steady_clock::now();
- std::this_thread::sleep_for(std::chrono::milliseconds(33) - (nextNow - now));
+ std::this_thread::sleep_for(std::chrono::milliseconds(11) - (nextNow - now));
}
}
}
void UDPC_queue_send(UDPC_HContext ctx, UDPC_ConnectionId destinationId,
- uint32_t isChecked, void *data, uint32_t size) {
+ int isChecked, void *data, uint32_t size) {
if(size == 0 || !data) {
return;
}
sendInfo.sender.port = c->socketInfo.sin6_port;
sendInfo.receiver.addr = destinationId.addr;
sendInfo.receiver.port = iter->second.port;
- sendInfo.flags = (isChecked ? 0x0 : 0x4);
+ sendInfo.flags = (isChecked != 0 ? 0x0 : 0x4);
iter->second.sendPkts.push(sendInfo);
}
return c->conMap.find(connectionId) == c->conMap.end() ? 0 : 1;
}
+UDPC_ConnectionId* UDPC_get_list_connected(UDPC_HContext ctx, unsigned int *size) {
+ UDPC::Context *c = UDPC::verifyContext(ctx);
+ if(!c) {
+ return nullptr;
+ }
+
+ std::lock_guard<std::mutex> lock(c->mutex);
+
+ if(c->conMap.empty()) {
+ return nullptr;
+ }
+ if(size) {
+ *size = c->conMap.size();
+ }
+
+ UDPC_ConnectionId *list = (UDPC_ConnectionId*)std::malloc(
+ sizeof(UDPC_ConnectionId) * (c->conMap.size() + 1));
+ UDPC_ConnectionId *current = list;
+ for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) {
+ *current = iter->first;
+ ++current;
+ }
+ *current = UDPC_ConnectionId{{0}, 0, 0};
+ return list;
+}
+
+void UDPC_free_list_connected(UDPC_ConnectionId *list) {
+ std::free(list);
+}
+
uint32_t UDPC_set_protocol_id(UDPC_HContext ctx, uint32_t id) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return static_cast<UDPC_LoggingType>(c->loggingType.exchange(loggingType));
}
-UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx) {
+UDPC_PacketInfo UDPC_get_received(UDPC_HContext ctx, unsigned int *remaining) {
UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) {
return UDPC::get_empty_pinfo();
}
- std::lock_guard<std::mutex> lock(c->mutex);
-
- // TODO impl
+ auto opt_pinfo = c->receivedPkts.top_and_pop();
+ if(remaining) {
+ *remaining = c->receivedPkts.size();
+ }
+ if(opt_pinfo) {
+ return *opt_pinfo;
+ }
return UDPC::get_empty_pinfo();
}
+int UDPC_set_received_capacity(UDPC_HContext ctx, unsigned int newCapacity) {
+ if(newCapacity == 0) {
+ return 0;
+ }
+
+ UDPC::Context *c = UDPC::verifyContext(ctx);
+ if(!c) {
+ return 0;
+ }
+
+ unsigned int status = 0;
+ c->receivedPkts.changeCapacity(newCapacity, &status);
+ if(status == 1) {
+ c->log(UDPC_LoggingType::UDPC_WARNING,
+ "Received Queue: Previous size was truncated to new capacity");
+ }
+ return 1;
+}
+
const char *UDPC_atostr_cid(UDPC_HContext ctx, UDPC_ConnectionId connectionId) {
return UDPC_atostr(ctx, connectionId.addr);
}
}
}
- if(UDPC_IPV6_ADDR_SUB(addr)[i] == 0 && (headIndex - index <= 1 || c->atostrBuf[index] == ':')) {
+ if(UDPC_IPV6_ADDR_SUB(addr)[i] == 0
+ && (headIndex - index <= 1 || c->atostrBuf[index] == ':')) {
continue;
} else {
std::stringstream sstream;
#include <thread>
#include <chrono>
#include <regex>
+#include <vector>
#include <UDPConnection.h>
puts("-lp <port> - listen port");
puts("-cl <addr> - connection addr (client only)");
puts("-cp <port> - connection port (client only)");
+ puts("-t <tick_count>");
+ puts("-n - do not add payload to packets");
}
int main(int argc, char **argv) {
const char *connectionAddr = nullptr;
const char *connectionPort = nullptr;
unsigned int tickLimit = 15;
+ bool noPayload = false;
while(argc > 0) {
if(std::strcmp(argv[0], "-c") == 0) {
isClient = true;
--argc; ++argv;
tickLimit = std::atoi(argv[0]);
printf("Set tick limit to %u\n", tickLimit);
+ } else if(std::strcmp(argv[0], "-n") == 0) {
+ noPayload = true;
+ puts("Disabling sending payload");
} else {
printf("ERROR: invalid argument \"%s\"\n", argv[0]);
usage();
}
UDPC_set_logging_type(context, UDPC_LoggingType::UDPC_INFO);
unsigned int tick = 0;
+ unsigned int temp = 0;
+ unsigned int temp2, temp3;
+ UDPC_ConnectionId *list = nullptr;
+ std::vector<unsigned int> sendIds;
+ UDPC_PacketInfo received;
while(true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
if(isClient && UDPC_has_connection(context, connectionId) == 0) {
UDPC_client_initiate_connection(context, connectionId);
}
+ if(!noPayload) {
+ list = UDPC_get_list_connected(context, &temp);
+ if(list) {
+ if(sendIds.size() < temp) {
+ sendIds.resize(temp, 0);
+ } else if(sendIds.size() > temp) {
+ sendIds.resize(temp);
+ }
+ for(unsigned int i = 0; i < temp; ++i) {
+ temp2 = UDPC_get_queue_send_available(context, list[i]);
+ for(unsigned int j = 0; j < temp2; ++j) {
+ temp3 = htonl(sendIds[i]++);
+ UDPC_queue_send(context, list[i], 0, &temp3, sizeof(unsigned int));
+ }
+ }
+ UDPC_free_list_connected(list);
+ }
+ do {
+ received = UDPC_get_received(context, &temp);
+ if(received.dataSize == sizeof(unsigned int)) {
+ if((received.flags & 0x8) != 0) {
+ temp2 = ntohl(*((unsigned int*)received.data));
+ printf("Got out of order, data = %u\n", temp2);
+ }
+ }
+ } while (temp > 0);
+ }
if(tick++ > tickLimit) {
break;
}