Receive packets in loop til none left in interval

Also some formatting fixes (max 80 chars in edited code).
This commit is contained in:
Stephen Seo 2022-12-04 19:57:37 +09:00
parent b7cd3a00c7
commit 77ac7f88a5

View file

@ -1116,6 +1116,7 @@ void UDPC::Context::update_impl() {
deletionMap.clear();
// receive packet
do {
UDPC_IPV6_SOCKADDR_TYPE receivedData;
socklen_t receivedDataSize = sizeof(receivedData);
int bytes = recvfrom(
@ -1162,7 +1163,8 @@ void UDPC::Context::update_impl() {
// Invalid protocol id in packet
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
"Received packet has invalid protocol id, ignoring packet from ",
"Received packet has invalid protocol id, ignoring packet "
"from ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
", port = ",
ntohs(receivedData.sin6_port));
@ -1183,7 +1185,8 @@ void UDPC::Context::update_impl() {
bool isNotRecChecked = conID & UDPC_ID_NO_REC_CHK;
bool isResending = conID & UDPC_ID_RESENDING;
conID &= 0x0FFFFFFF;
UDPC_ConnectionId identifier = UDPC_create_id_full(receivedData.sin6_addr,
UDPC_ConnectionId identifier =
UDPC_create_id_full(receivedData.sin6_addr,
receivedData.sin6_scope_id,
ntohs(receivedData.sin6_port));
@ -1253,24 +1256,30 @@ void UDPC::Context::update_impl() {
// is receiving as server, connection did not already exist
int authPolicy = this->authPolicy.load();
if(pktType == 1 && !flags.test(2)
&& authPolicy == UDPC_AuthPolicy::UDPC_AUTH_POLICY_STRICT) {
&& authPolicy
== UDPC_AuthPolicy::UDPC_AUTH_POLICY_STRICT) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"Client peer ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
UDPC_atostr(
(UDPC_HContext)this, receivedData.sin6_addr),
" port ",
ntohs(receivedData.sin6_port),
" attempted connection with packet authentication "
"enabled, but auth is disabled and AuthPolicy is STRICT");
"enabled, but auth is disabled and AuthPolicy is "
"STRICT");
return;
} else if(pktType == 0 && flags.test(2)
&& authPolicy == UDPC_AuthPolicy::UDPC_AUTH_POLICY_STRICT) {
&& authPolicy
== UDPC_AuthPolicy::UDPC_AUTH_POLICY_STRICT) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"Client peer ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
UDPC_atostr(
(UDPC_HContext)this, receivedData.sin6_addr),
" port ",
ntohs(receivedData.sin6_port),
" attempted connection with packet authentication "
"disabled, but auth is enabled and AuthPolicy is STRICT");
"disabled, but auth is enabled and AuthPolicy is "
"STRICT");
return;
}
unsigned char *sk = nullptr;
@ -1296,9 +1305,10 @@ void UDPC::Context::update_impl() {
if(newConnection.flags.test(5)) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_ERROR,
"Failed to init ConnectionData instance (libsodium init "
"fail) while server establishing connection with ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
"Failed to init ConnectionData instance (libsodium init"
" fail) while server establishing connection with ",
UDPC_atostr(
(UDPC_HContext)this, receivedData.sin6_addr),
", port = ",
ntohs(receivedData.sin6_port));
return;
@ -1310,20 +1320,26 @@ void UDPC::Context::update_impl() {
recvBuf + UDPC_MIN_HEADER_SIZE + 4,
crypto_sign_PUBLICKEYBYTES);
{
std::lock_guard<std::mutex> pkWhitelistLock(peerPKWhitelistMutex);
if(!peerPKWhitelist.empty() && peerPKWhitelist.find(UDPC::PKContainer(newConnection.peer_pk)) == peerPKWhitelist.end()) {
std::lock_guard<std::mutex>
pkWhitelistLock(peerPKWhitelistMutex);
if(!peerPKWhitelist.empty()
&& peerPKWhitelist.find(
UDPC::PKContainer(newConnection.peer_pk))
== peerPKWhitelist.end()) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_WARNING,
"peer_pk is not in whitelist, not establishing "
"connection with client");
return;
}
}
newConnection.verifyMessage = std::unique_ptr<char[]>(new char[crypto_sign_BYTES]);
newConnection.verifyMessage =
std::unique_ptr<char[]>(new char[crypto_sign_BYTES]);
std::time_t currentTime = std::time(nullptr);
uint64_t receivedTime;
std::memcpy(
&receivedTime,
recvBuf + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES + 4,
recvBuf + UDPC_MIN_HEADER_SIZE + 4
+ crypto_sign_PUBLICKEYBYTES + 4,
8);
UDPC::be64((char*)&receivedTime);
# ifndef NDEBUG
@ -1334,7 +1350,8 @@ void UDPC::Context::update_impl() {
}
# endif
std::time_t receivedTimeT = receivedTime;
if(currentTime < receivedTimeT || currentTime - receivedTimeT > 3) {
if(currentTime < receivedTimeT
|| currentTime - receivedTimeT > 3) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_WARNING,
"Got invalid epoch time from client, ignoring");
return;
@ -1342,13 +1359,15 @@ void UDPC::Context::update_impl() {
crypto_sign_detached(
(unsigned char*)newConnection.verifyMessage.get(),
nullptr,
(unsigned char*)(recvBuf + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES),
(unsigned char*)(recvBuf + UDPC_MIN_HEADER_SIZE + 4
+ crypto_sign_PUBLICKEYBYTES),
12,
newConnection.sk);
#else
assert(!"libsodium disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"libsodium is disabled, cannot process received packet");
"libsodium is disabled, cannot process received "
"packet");
return;
#endif
}
@ -1363,13 +1382,15 @@ void UDPC::Context::update_impl() {
", libsodium enabled" : ", libsodium disabled");
idMap.insert(std::make_pair(newConnection.id, identifier));
conMap.insert(std::make_pair(identifier, std::move(newConnection)));
conMap.insert(std::make_pair(identifier,
std::move(newConnection)));
auto addrConIter = addrConMap.find(identifier.addr);
if(addrConIter == addrConMap.end()) {
auto insertResult = addrConMap.insert(
std::make_pair(
identifier.addr,
std::unordered_set<UDPC_ConnectionId, UDPC::ConnectionIdHasher>{}
std::unordered_set<UDPC_ConnectionId,
UDPC::ConnectionIdHasher>{}
));
assert(insertResult.second
&& "Must successfully insert into addrConMap");
@ -1389,45 +1410,58 @@ void UDPC::Context::update_impl() {
if(iter == conMap.end() || !iter->second.flags.test(3)) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_DEBUG,
"client dropped pkt from ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
UDPC_atostr(
(UDPC_HContext)this, receivedData.sin6_addr),
", port ", ntohs(receivedData.sin6_port));
return;
}
int authPolicy = this->authPolicy.load();
if(pktType == 2 && !iter->second.flags.test(6)
&& authPolicy == UDPC_AuthPolicy::UDPC_AUTH_POLICY_STRICT) {
// This block actually should never happen, because the server
// receives a packet first. If client requests without auth,
// then the server will either deny connection (if strict) or
// fallback to a connection without auth (if fallback).
&& authPolicy
== UDPC_AuthPolicy::UDPC_AUTH_POLICY_STRICT) {
// This block actually should never happen, because the
// server receives a packet first. If client requests
// without auth, then the server will either deny
// connection (if strict) or fallback to a connection
// without auth (if fallback).
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"Server peer ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
UDPC_atostr(
(UDPC_HContext)this, receivedData.sin6_addr),
" port ",
ntohs(receivedData.sin6_port),
" attempted connection with packet authentication "
"enabled, but auth is disabled and AuthPolicy is STRICT");
"enabled, but auth is disabled and AuthPolicy is "
"STRICT");
return;
} else if(pktType == 0 && iter->second.flags.test(6)
&& authPolicy == UDPC_AuthPolicy::UDPC_AUTH_POLICY_STRICT) {
&& authPolicy
== UDPC_AuthPolicy::UDPC_AUTH_POLICY_STRICT) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"Server peer ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
UDPC_atostr(
(UDPC_HContext)this, receivedData.sin6_addr),
" port ",
ntohs(receivedData.sin6_port),
" attempted connection with packet authentication "
"disabled, but auth is enabled and AuthPolicy is STRICT");
"disabled, but auth is enabled and AuthPolicy is "
"STRICT");
return;
}
if(pktType == 2 && flags.test(2) && iter->second.flags.test(6)) {
if(pktType == 2 && flags.test(2)
&& iter->second.flags.test(6)) {
#ifdef UDPC_LIBSODIUM_ENABLED
std::memcpy(iter->second.peer_pk,
recvBuf + UDPC_MIN_HEADER_SIZE + 4,
crypto_sign_PUBLICKEYBYTES);
{
std::lock_guard<std::mutex> pkWhitelistLock(peerPKWhitelistMutex);
if(!peerPKWhitelist.empty() && peerPKWhitelist.find(UDPC::PKContainer(iter->second.peer_pk)) == peerPKWhitelist.end()) {
std::lock_guard<std::mutex>
pkWhitelistLock(peerPKWhitelistMutex);
if(!peerPKWhitelist.empty()
&& peerPKWhitelist.find(
UDPC::PKContainer(iter->second.peer_pk))
== peerPKWhitelist.end()) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_WARNING,
"peer_pk is not in whitelist, not establishing "
"connection with server");
@ -1435,13 +1469,15 @@ void UDPC::Context::update_impl() {
}
}
if(crypto_sign_verify_detached(
(unsigned char*)(recvBuf + UDPC_MIN_HEADER_SIZE + 4 + crypto_sign_PUBLICKEYBYTES),
(unsigned char*)(recvBuf + UDPC_MIN_HEADER_SIZE + 4
+ crypto_sign_PUBLICKEYBYTES),
(unsigned char*)(iter->second.verifyMessage.get()),
12,
iter->second.peer_pk) != 0) {
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_WARNING,
"Failed to verify peer (server) ",
UDPC_atostr((UDPC_HContext)this, receivedData.sin6_addr),
UDPC_atostr(
(UDPC_HContext)this, receivedData.sin6_addr),
", port = ",
ntohs(receivedData.sin6_port));
return;
@ -1449,7 +1485,8 @@ void UDPC::Context::update_impl() {
#else
assert(!"libsodium disabled, invalid state");
UDPC_CHECK_LOG(this, UDPC_LoggingType::UDPC_ERROR,
"libsodium is disabled, cannot process received packet");
"libsodium is disabled, cannot process received "
"packet");
return;
#endif
} else if(pktType == 0 && iter->second.flags.test(6)) {
@ -1496,8 +1533,12 @@ void UDPC::Context::update_impl() {
#ifdef UDPC_LIBSODIUM_ENABLED
// verify signature of header
unsigned char sig[crypto_sign_BYTES];
std::memcpy(sig, recvBuf + UDPC_MIN_HEADER_SIZE + 1, crypto_sign_BYTES);
std::memset(recvBuf + UDPC_MIN_HEADER_SIZE + 1, 0, crypto_sign_BYTES);
std::memcpy(sig,
recvBuf + UDPC_MIN_HEADER_SIZE + 1,
crypto_sign_BYTES);
std::memset(recvBuf + UDPC_MIN_HEADER_SIZE + 1,
0,
crypto_sign_BYTES);
if(crypto_sign_verify_detached(
sig,
(unsigned char*)recvBuf,
@ -1539,7 +1580,8 @@ void UDPC::Context::update_impl() {
if(conIter != conMap.end()) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
"Packet is request-disconnect packet, deleting connection...");
"Packet is request-disconnect packet, deleting "
"connection...");
if(conIter->second.flags.test(4)) {
idMap.erase(conIter->second.id);
}
@ -1561,7 +1603,9 @@ void UDPC::Context::update_impl() {
}
// update rtt
for(auto sentIter = iter->second.sentPkts.rbegin(); sentIter != iter->second.sentPkts.rend(); ++sentIter) {
for(auto sentIter = iter->second.sentPkts.rbegin();
sentIter != iter->second.sentPkts.rend();
++sentIter) {
uint32_t id;
std::memcpy(&id, sentIter->data + 8, 4);
id = ntohl(id);
@ -1576,7 +1620,8 @@ void UDPC::Context::update_impl() {
iter->second.rtt -= (iter->second.rtt - diff) / 10;
}
iter->second.flags.set(2, iter->second.rtt <= UDPC::GOOD_RTT_LIMIT);
iter->second.flags.set(
2, iter->second.rtt <= UDPC::GOOD_RTT_LIMIT);
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
@ -1598,23 +1643,33 @@ void UDPC::Context::update_impl() {
}
// pkt not received yet, find it in sent to check if it timed out
for(auto sentIter = iter->second.sentPkts.rbegin(); sentIter != iter->second.sentPkts.rend(); ++sentIter) {
for(auto sentIter = iter->second.sentPkts.rbegin();
sentIter != iter->second.sentPkts.rend();
++sentIter) {
uint32_t sentID;
std::memcpy(&sentID, sentIter->data + 8, 4);
sentID = ntohl(sentID);
if(sentID == rseq) {
if((sentIter->flags & 0x4) != 0 || (sentIter->flags & 0x8) != 0) {
if((sentIter->flags & 0x4) != 0
|| (sentIter->flags & 0x8) != 0) {
// already resent or not rec-checked pkt
break;
}
auto sentInfoIter = iter->second.sentInfoMap.find(sentID);
assert(sentInfoIter != iter->second.sentInfoMap.end()
&& "Every entry in sentPkts must have a corresponding entry in sentInfoMap");
&& "Every entry in sentPkts must have a "
"corresponding entry in sentInfoMap");
auto duration = now - sentInfoIter->second->sentTime;
if(duration > UDPC::PACKET_TIMEOUT_TIME) {
bool pktSigned = sentIter->data[UDPC_MIN_HEADER_SIZE] == 1;
if((pktSigned && sentIter->dataSize <= UDPC_LSFULL_HEADER_SIZE)
|| (!pktSigned && sentIter->dataSize <= UDPC_NSFULL_HEADER_SIZE)) {
bool pktSigned =
sentIter->data[UDPC_MIN_HEADER_SIZE] == 1;
if((pktSigned
&& sentIter->dataSize
<= UDPC_LSFULL_HEADER_SIZE
) ||
(!pktSigned
&& sentIter->dataSize
<= UDPC_NSFULL_HEADER_SIZE)) {
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
"Timed out packet has no payload (probably "
@ -1625,14 +1680,18 @@ void UDPC::Context::update_impl() {
UDPC_PacketInfo resendingData = UDPC::get_empty_pinfo();
if(pktSigned) {
resendingData.dataSize = sentIter->dataSize - UDPC_LSFULL_HEADER_SIZE;
resendingData.data = (char*)std::malloc(resendingData.dataSize);
resendingData.dataSize =
sentIter->dataSize - UDPC_LSFULL_HEADER_SIZE;
resendingData.data =
(char*)std::malloc(resendingData.dataSize);
std::memcpy(resendingData.data,
sentIter->data + UDPC_LSFULL_HEADER_SIZE,
resendingData.dataSize);
} else {
resendingData.dataSize = sentIter->dataSize - UDPC_NSFULL_HEADER_SIZE;
resendingData.data = (char*)std::malloc(resendingData.dataSize);
resendingData.dataSize =
sentIter->dataSize - UDPC_NSFULL_HEADER_SIZE;
resendingData.data =
(char*)std::malloc(resendingData.dataSize);
std::memcpy(resendingData.data,
sentIter->data + UDPC_NSFULL_HEADER_SIZE,
resendingData.dataSize);
@ -1663,7 +1722,8 @@ void UDPC::Context::update_impl() {
// already received packet
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
"Received packet is already marked as received, ignoring it");
"Received packet is already marked as received, "
"ignoring it");
return;
}
iter->second.ack |= 0x80000000 >> (diff - 1);
@ -1677,7 +1737,8 @@ void UDPC::Context::update_impl() {
// already received packet
UDPC_CHECK_LOG(this,
UDPC_LoggingType::UDPC_VERBOSE,
"Received packet is already marked as received, ignoring it");
"Received packet is already marked as received, "
"ignoring it");
return;
}
iter->second.ack |= 0x80000000 >> (diff - 1);
@ -1749,6 +1810,7 @@ void UDPC::Context::update_impl() {
UDPC_LoggingType::UDPC_VERBOSE,
"Received packet has no payload (probably heartbeat packet)");
}
} while (true);
}
UDPC::Context *UDPC::verifyContext(UDPC_HContext ctx) {