Impl disc. request (untested), del make_unique

This commit is contained in:
Stephen Seo 2019-12-17 20:04:31 +09:00
parent a4efd98890
commit dbdade3b00
3 changed files with 164 additions and 46 deletions

View file

@ -224,6 +224,7 @@ public:
std::unordered_map<UDPC_IPV6_ADDR_TYPE, std::unordered_set<UDPC_ConnectionId, ConnectionIdHasher>, IPV6_Hasher> addrConMap;
// id to ipv6 address and port (as UDPC_ConnectionId)
std::unordered_map<uint32_t, UDPC_ConnectionId> idMap;
std::unordered_set<UDPC_ConnectionId, ConnectionIdHasher> deletionMap;
TSLQueue<UDPC_PacketInfo> receivedPkts;
TSLQueue<UDPC_PacketInfo> cSendPkts;
// handled internally

View file

@ -304,7 +304,7 @@ void UDPC::Context::update_impl() {
auto timeT = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
ss << std::put_time(std::gmtime(&timeT), "%c %Z");
auto timeString = ss.str();
newCon.verifyMessage = std::make_unique<char[]>(4 + timeString.size());
newCon.verifyMessage = std::unique_ptr<char[]>(new char[4 + timeString.size()]);
*((uint32_t*)newCon.verifyMessage.get()) = timeString.size();
std::memcpy(newCon.verifyMessage.get() + 4, timeString.c_str(), timeString.size());
#ifndef NDEBUG
@ -389,7 +389,7 @@ void UDPC::Context::update_impl() {
auto timeT = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
ss << std::put_time(std::gmtime(&timeT), "%c %Z");
auto timeString = ss.str();
newCon.verifyMessage = std::make_unique<char[]>(4 + timeString.size());
newCon.verifyMessage = std::unique_ptr<char[]>(new char[4 + timeString.size()]);
*((uint32_t*)newCon.verifyMessage.get()) = timeString.size();
std::memcpy(newCon.verifyMessage.get() + 4, timeString.c_str(), timeString.size());
#ifndef NDEBUG
@ -447,39 +447,17 @@ void UDPC::Context::update_impl() {
for(auto identIter = addrConIter->second.begin();
identIter != addrConIter->second.end();
++identIter) {
auto conIter = conMap.find(*identIter);
assert(conIter != conMap.end() &&
"conMap must have connection listed in addrConMap");
if(conIter->second.flags.test(4)) {
idMap.erase(conIter->second.id);
assert(conMap.find(*identIter) != conMap.end()
&& "conMap must have connection listed in "
"addrConMap");
deletionMap.insert(*identIter);
}
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
UDPC_ET_DISCONNECTED, conIter->first, false});
}
conMap.erase(conIter);
}
addrConMap.erase(addrConIter);
}
} else {
// drop only specific connection with addr and port
auto iter = conMap.find(optE->conId);
if(iter != conMap.end()) {
if(iter->second.flags.test(4)) {
idMap.erase(iter->second.id);
}
auto addrConIter = addrConMap.find(optE->conId.addr);
if(addrConIter != addrConMap.end()) {
addrConIter->second.erase(optE->conId);
if(addrConIter->second.empty()) {
addrConMap.erase(addrConIter);
}
}
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
UDPC_ET_DISCONNECTED, iter->first, false});
}
conMap.erase(iter);
deletionMap.insert(iter->first);
}
}
break;
@ -660,9 +638,83 @@ void UDPC::Context::update_impl() {
// update send (only if triggerSend flag is set)
for(auto iter = conMap.begin(); iter != conMap.end(); ++iter) {
if(!iter->second.flags.test(0)) {
auto delIter = deletionMap.find(iter->first);
if(!iter->second.flags.test(0) && delIter == deletionMap.end()) {
continue;
} else if(delIter != deletionMap.end()) {
if(iter->second.flags.test(3)) {
// not initiated connection yet, no need to send disconnect pkt
continue;
}
unsigned int sendSize = 0;
std::unique_ptr<char[]> buf;
if(flags.test(2) && iter->second.flags.test(6)) {
sendSize = UDPC_LSFULL_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
} else {
sendSize = UDPC_NSFULL_HEADER_SIZE;
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
}
UDPC::preparePacket(
buf.get(),
protocolID,
iter->second.id,
iter->second.rseq,
iter->second.ack,
&iter->second.lseq,
0x3);
if(flags.test(2) && iter->second.flags.test(6)) {
#ifdef UDPC_LIBSODIUM_ENABLED
if(crypto_sign_detached(
(unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE + 1), nullptr,
(unsigned char*)buf.get(), UDPC_MIN_HEADER_SIZE,
iter->second.sk) != 0) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"Failed to sign packet for peer ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port ",
iter->second.port);
continue;
}
#else
assert(!"libsodium disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"libsodium is disabled, cannot send packet");
continue;
#endif
}
UDPC_IPV6_SOCKADDR_TYPE destinationInfo;
destinationInfo.sin6_family = AF_INET6;
std::memcpy(
UDPC_IPV6_ADDR_SUB(destinationInfo.sin6_addr),
UDPC_IPV6_ADDR_SUB(iter->first.addr),
16);
destinationInfo.sin6_port = htons(iter->second.port);
destinationInfo.sin6_flowinfo = 0;
destinationInfo.sin6_scope_id = iter->first.scope_id;
long int sentBytes = sendto(
socketHandle,
buf.get(),
sendSize,
0,
(struct sockaddr*) &destinationInfo,
sizeof(UDPC_IPV6_SOCKADDR_TYPE));
if(sentBytes != sendSize) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR,
"Failed to send disconnect packet to ",
UDPC_atostr((UDPC_HContext)this, iter->first.addr),
", port = ",
iter->second.port);
continue;
}
continue;
}
// clear triggerSend flag
iter->second.flags.reset(0);
if(iter->second.flags.test(3)) {
@ -681,7 +733,7 @@ void UDPC::Context::update_impl() {
assert(iter->second.verifyMessage
&& "Verify message should already exist");
sendSize = UDPC_CCL_HEADER_SIZE + *((uint32_t*)iter->second.verifyMessage.get());
buf = std::make_unique<char[]>(sendSize);
buf = std::unique_ptr<char[]>(new char[sendSize]);
// set type 1
*((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(1);
// set public key
@ -708,7 +760,7 @@ void UDPC::Context::update_impl() {
#endif
} else {
sendSize = UDPC_CON_HEADER_SIZE;
buf = std::make_unique<char[]>(sendSize);
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((uint32_t*)(buf.get() + 20)) = 0;
}
UDPC::preparePacket(
@ -758,7 +810,7 @@ void UDPC::Context::update_impl() {
if(flags.test(2) && iter->second.flags.test(6)) {
#ifdef UDPC_LIBSODIUM_ENABLED
sendSize = UDPC_CSR_HEADER_SIZE;
buf = std::make_unique<char[]>(sendSize);
buf = std::unique_ptr<char[]>(new char[sendSize]);
// set type
*((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = htonl(2);
// set pubkey
@ -780,7 +832,7 @@ void UDPC::Context::update_impl() {
#endif
} else {
sendSize = UDPC_CON_HEADER_SIZE;
buf = std::make_unique<char[]>(sendSize);
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((uint32_t*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
}
UDPC::preparePacket(
@ -834,11 +886,11 @@ void UDPC::Context::update_impl() {
std::unique_ptr<char[]> buf;
if(flags.test(2) && iter->second.flags.test(6)) {
sendSize = UDPC_LSFULL_HEADER_SIZE;
buf = std::make_unique<char[]>(sendSize);
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
} else {
sendSize = UDPC_NSFULL_HEADER_SIZE;
buf = std::make_unique<char[]>(sendSize);
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
}
UDPC::preparePacket(
@ -929,11 +981,11 @@ void UDPC::Context::update_impl() {
unsigned int sendSize = 0;
if(flags.test(2) && iter->second.flags.test(6)) {
sendSize = UDPC_LSFULL_HEADER_SIZE + pInfo.dataSize;
buf = std::make_unique<char[]>(sendSize);
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 1;
} else {
sendSize = UDPC_NSFULL_HEADER_SIZE + pInfo.dataSize;
buf = std::make_unique<char[]>(sendSize);
buf = std::unique_ptr<char[]>(new char[sendSize]);
*((unsigned char*)(buf.get() + UDPC_MIN_HEADER_SIZE)) = 0;
}
@ -1033,6 +1085,29 @@ void UDPC::Context::update_impl() {
iter->second.sent = now;
}
// remove queued for deletion
for(auto delIter = deletionMap.begin(); delIter != deletionMap.end(); ++delIter) {
auto iter = conMap.find(*delIter);
if(iter != conMap.end()) {
if(iter->second.flags.test(4)) {
idMap.erase(iter->second.id);
}
auto addrConIter = addrConMap.find(delIter->addr);
if(addrConIter != addrConMap.end()) {
addrConIter->second.erase(*delIter);
if(addrConIter->second.empty()) {
addrConMap.erase(addrConIter);
}
}
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
UDPC_ET_DISCONNECTED, iter->first, false});
}
conMap.erase(iter);
}
}
deletionMap.clear();
// receive packet
UDPC_IPV6_SOCKADDR_TYPE receivedData;
socklen_t receivedDataSize = sizeof(receivedData);
@ -1095,8 +1170,9 @@ void UDPC::Context::update_impl() {
bool isNotRecChecked = conID & UDPC_ID_NO_REC_CHK;
bool isResending = conID & UDPC_ID_RESENDING;
conID &= 0x0FFFFFFF;
UDPC_ConnectionId identifier{receivedData.sin6_addr, receivedData.sin6_scope_id, ntohs(receivedData.sin6_port)};
if(isConnect && bytes < (int)(UDPC_CON_HEADER_SIZE)) {
if(isConnect && !isPing && bytes < (int)(UDPC_CON_HEADER_SIZE)) {
// invalid packet size
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
@ -1106,7 +1182,8 @@ void UDPC::Context::update_impl() {
ntohs(receivedData.sin6_port),
", ignoring");
return;
} else if (!isConnect && bytes < (int)UDPC_NSFULL_HEADER_SIZE) {
} else if ((!isConnect || (isConnect && isPing))
&& bytes < (int)UDPC_NSFULL_HEADER_SIZE) {
// packet is too small
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
@ -1118,10 +1195,8 @@ void UDPC::Context::update_impl() {
return;
}
UDPC_ConnectionId identifier{receivedData.sin6_addr, receivedData.sin6_scope_id, ntohs(receivedData.sin6_port)};
uint32_t pktType = 0;
if(isConnect) {
if(isConnect && !isPing) {
pktType = ntohl(*((uint32_t*)(recvBuf + UDPC_MIN_HEADER_SIZE)));
switch(pktType) {
case 0: // client/server connect with libsodium disabled
@ -1153,7 +1228,7 @@ void UDPC::Context::update_impl() {
}
}
if(isConnect) {
if(isConnect && !isPing) {
// is connect packet and is accepting new connections
if(!flags.test(1)
&& conMap.find(identifier) == conMap.end()
@ -1227,7 +1302,7 @@ void UDPC::Context::update_impl() {
newConnection.peer_pk,
recvBuf + UDPC_MIN_HEADER_SIZE + 4,
crypto_sign_PUBLICKEYBYTES);
newConnection.verifyMessage = std::make_unique<char[]>(crypto_sign_BYTES);
newConnection.verifyMessage = std::unique_ptr<char[]>(new char[crypto_sign_BYTES]);
crypto_sign_detached(
(unsigned char*)newConnection.verifyMessage.get(),
nullptr,
@ -1376,7 +1451,7 @@ void UDPC::Context::update_impl() {
if(iter == conMap.end() || iter->second.flags.test(3)
|| !iter->second.flags.test(4) || iter->second.id != conID) {
return;
} else if(isPing) {
} else if(isPing && !isConnect) {
iter->second.flags.set(0);
}
@ -1417,6 +1492,32 @@ void UDPC::Context::update_impl() {
", good mode = ", iter->second.flags.test(1) ? "yes" : "no",
isPing ? ", ping" : "");
// check if is delete
if(isConnect && isPing) {
auto conIter = conMap.find(identifier);
if(conIter != conMap.end()) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
"Packet is request-disconnect packet, deleting connection...");
if(conIter->second.flags.test(4)) {
idMap.erase(conIter->second.id);
}
auto addrConIter = addrConMap.find(identifier.addr);
if(addrConIter != addrConMap.end()) {
addrConIter->second.erase(identifier);
if(addrConIter->second.empty()) {
addrConMap.erase(addrConIter);
}
}
if(isReceivingEvents.load()) {
externalEvents.push(UDPC_Event{
UDPC_ET_DISCONNECTED, identifier, false});
}
conMap.erase(conIter);
return;
}
}
// update rtt
for(auto sentIter = iter->second.sentPkts.rbegin(); sentIter != iter->second.sentPkts.rend(); ++sentIter) {
uint32_t id = ntohl(*((uint32_t*)(sentIter->data + 8)));

View file

@ -304,6 +304,22 @@ int main(int argc, char **argv) {
break;
}
}
UDPC_set_accept_new_connections(context, 0);
puts("Dropping all connections...");
UDPC_ConnectionId *ids = UDPC_get_list_connected(context, NULL);
UDPC_ConnectionId *current = ids;
while(current->scope_id != 0 && current->port != 0) {
UDPC_drop_connection(context, *current, 0);
++current;
}
UDPC_free_list_connected(ids);
puts("Waiting 2 seconds for disconnect packets to be sent...");
sleep_seconds(2);
puts("Cleaning up UDPC context...");
UDPC_destroy(context);
return 0;