More impl: rtt and timepoint per sent pkt

This commit is contained in:
Stephen Seo 2019-08-22 20:16:07 +09:00
parent 0065928422
commit 7482fecb37
3 changed files with 87 additions and 18 deletions

View file

@ -7,6 +7,7 @@
#define UDPC_BAD_MODE_SEND_INTERVAL (1.0f / 10.0f) #define UDPC_BAD_MODE_SEND_INTERVAL (1.0f / 10.0f)
#define UDPC_SENT_PKTS_MAX_SIZE 33 #define UDPC_SENT_PKTS_MAX_SIZE 33
#define UDPC_QUEUED_PKTS_MAX_SIZE 32 #define UDPC_QUEUED_PKTS_MAX_SIZE 32
#define UDPC_GOOD_RTT_LIMIT_SEC 0.25f
#define UDPC_ID_CONNECT 0x80000000 #define UDPC_ID_CONNECT 0x80000000
#define UDPC_ID_PING 0x40000000 #define UDPC_ID_PING 0x40000000
@ -19,7 +20,9 @@
#include <cstdint> #include <cstdint>
#include <deque> #include <deque>
#include <unordered_map> #include <unordered_map>
#include <queue>
#include <random> #include <random>
#include <memory>
#include "TSQueue.hpp" #include "TSQueue.hpp"
#include "UDPConnection.h" #include "UDPConnection.h"
@ -31,8 +34,18 @@ static const auto INIT_PKT_INTERVAL_DT = std::chrono::seconds(5);
static const auto HEARTBEAT_PKT_INTERVAL_DT = std::chrono::milliseconds(150); static const auto HEARTBEAT_PKT_INTERVAL_DT = std::chrono::milliseconds(150);
static const auto PACKET_TIMEOUT_TIME = std::chrono::seconds(1); static const auto PACKET_TIMEOUT_TIME = std::chrono::seconds(1);
// forward declaration
struct Context; struct Context;
struct SentPktInfo {
typedef std::shared_ptr<SentPktInfo> Ptr;
SentPktInfo();
uint32_t id;
std::chrono::steady_clock::time_point sentTime;
};
struct ConnectionData { struct ConnectionData {
ConnectionData(); ConnectionData();
ConnectionData(bool isServer, Context *ctx); ConnectionData(bool isServer, Context *ctx);
@ -45,6 +58,8 @@ struct ConnectionData {
ConnectionData(ConnectionData&& other) = default; ConnectionData(ConnectionData&& other) = default;
ConnectionData& operator=(ConnectionData&& other) = default; ConnectionData& operator=(ConnectionData&& other) = default;
void cleanupSentPkts();
/* /*
* 0 - trigger send * 0 - trigger send
* 1 - is good mode * 1 - is good mode
@ -66,6 +81,8 @@ struct ConnectionData {
std::deque<UDPC_PacketInfo> sentPkts; std::deque<UDPC_PacketInfo> sentPkts;
TSQueue<UDPC_PacketInfo> sendPkts; TSQueue<UDPC_PacketInfo> sendPkts;
TSQueue<UDPC_PacketInfo> priorityPkts; TSQueue<UDPC_PacketInfo> priorityPkts;
// pkt id to pkt shared_ptr
std::unordered_map<uint32_t, SentPktInfo::Ptr> sentInfoMap;
std::chrono::steady_clock::time_point received; std::chrono::steady_clock::time_point received;
std::chrono::steady_clock::time_point sent; std::chrono::steady_clock::time_point sent;
float rtt; float rtt;
@ -116,6 +133,10 @@ void preparePacket(char *data, uint32_t protocolID, uint32_t conID,
uint32_t generateConnectionID(Context &ctx); uint32_t generateConnectionID(Context &ctx);
float durationToFSec(
const std::chrono::steady_clock::time_point& older,
const std::chrono::steady_clock::time_point& newer);
} // namespace UDPC } // namespace UDPC
#endif #endif

View file

@ -7,6 +7,11 @@
#include <optional> #include <optional>
#include <vector> #include <vector>
UDPC::SentPktInfo::SentPktInfo() :
id(0),
sentTime(std::chrono::steady_clock::now())
{}
UDPC::ConnectionData::ConnectionData() : UDPC::ConnectionData::ConnectionData() :
flags(), flags(),
timer(0.0f), timer(0.0f),
@ -33,6 +38,15 @@ UDPC::ConnectionData::ConnectionData()
} }
} }
void UDPC::ConnectionData::cleanupSentPkts() {
uint32_t id;
while(sentPkts.size() > UDPC_SENT_PKTS_MAX_SIZE) {
id = *((uint32_t*)(sentPkts.front().data + 8));
sentInfoMap.erase(id);
sentPkts.pop_front();
}
}
UDPC::Context::Context(bool isThreaded) UDPC::Context::Context(bool isThreaded)
: _contextIdentifier(UDPC_CONTEXT_IDENTIFIER), flags(), : _contextIdentifier(UDPC_CONTEXT_IDENTIFIER), flags(),
isAcceptNewConnections(true), protocolID(UDPC_DEFAULT_PROTOCOL_ID), isAcceptNewConnections(true), protocolID(UDPC_DEFAULT_PROTOCOL_ID),
@ -122,6 +136,14 @@ uint32_t UDPC::generateConnectionID(Context &ctx) {
return id; return id;
} }
float UDPC::durationToFSec(
const std::chrono::steady_clock::time_point& older,
const std::chrono::steady_clock::time_point& newer) {
const auto dt = newer - older;
return (float)dt.count()
* (float)decltype(dt)::period::num / (float)decltype(dt)::period::den;
}
void *UDPC_init(uint16_t listenPort, uint32_t listenAddr, int isClient) { void *UDPC_init(uint16_t listenPort, uint32_t listenAddr, int isClient) {
UDPC::Context *ctx = new UDPC::Context(false); UDPC::Context *ctx = new UDPC::Context(false);
@ -200,21 +222,16 @@ void UDPC_update(void *ctx) {
return; return;
} }
const auto prevNow = std::move(c->lastUpdated);
const auto now = std::chrono::steady_clock::now(); const auto now = std::chrono::steady_clock::now();
const auto dt = now - c->lastUpdated; c->lastUpdated = now;
const float dt_fs = (float)dt.count() * (float)decltype(dt)::period::num /
(float)decltype(dt)::period::den;
std::chrono::steady_clock::duration temp_dt;
float temp_dt_fs; float temp_dt_fs;
{ {
// check timed out, check good/bad mode with rtt, remove timed out // check timed out, check good/bad mode with rtt, remove timed out
std::vector<uint32_t> removed; std::vector<uint32_t> removed;
for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) { for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) {
temp_dt = now - iter->second.received; temp_dt_fs = UDPC::durationToFSec(iter->second.received, now);
temp_dt_fs = (float)temp_dt.count() *
(float)decltype(temp_dt)::period::num /
(float)decltype(temp_dt)::period::den;
if(temp_dt_fs >= UDPC_TIMEOUT_SECONDS) { if(temp_dt_fs >= UDPC_TIMEOUT_SECONDS) {
removed.push_back(iter->first); removed.push_back(iter->first);
continue; continue;
@ -384,11 +401,15 @@ void UDPC_update(void *ctx) {
pInfo.receiver = iter->first; pInfo.receiver = iter->first;
pInfo.senderPort = c->socketInfo.sin_port; pInfo.senderPort = c->socketInfo.sin_port;
pInfo.receiverPort = iter->second.port; pInfo.receiverPort = iter->second.port;
*((uint32_t*)(pInfo.data + 8)) = iter->second.lseq - 1;
iter->second.sentPkts.push_back(std::move(pInfo)); iter->second.sentPkts.push_back(std::move(pInfo));
while(iter->second.sentPkts.size() > UDPC_SENT_PKTS_MAX_SIZE) { iter->second.cleanupSentPkts();
iter->second.sentPkts.pop_front();
} // store other pkt info
UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
sentPktInfo->id = iter->second.lseq - 1;
iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
} else { } else {
// sendPkts or priorityPkts not empty // sendPkts or priorityPkts not empty
UDPC_PacketInfo pInfo; UDPC_PacketInfo pInfo;
@ -440,11 +461,9 @@ void UDPC_update(void *ctx) {
sentPInfo.receiverPort = iter->second.port; sentPInfo.receiverPort = iter->second.port;
iter->second.sentPkts.push_back(std::move(pInfo)); iter->second.sentPkts.push_back(std::move(pInfo));
while(iter->second.sentPkts.size() > UDPC_SENT_PKTS_MAX_SIZE) { iter->second.cleanupSentPkts();
iter->second.sentPkts.pop_front();
}
} else { } else {
// is not check-received, no data stored but other data is kept // is not check-received, only id stored in data array
UDPC_PacketInfo sentPInfo; UDPC_PacketInfo sentPInfo;
sentPInfo.flags = 0x4; sentPInfo.flags = 0x4;
sentPInfo.dataSize = 0; sentPInfo.dataSize = 0;
@ -452,12 +471,16 @@ void UDPC_update(void *ctx) {
sentPInfo.receiver = iter->first; sentPInfo.receiver = iter->first;
sentPInfo.senderPort = c->socketInfo.sin_port; sentPInfo.senderPort = c->socketInfo.sin_port;
sentPInfo.receiverPort = iter->second.port; sentPInfo.receiverPort = iter->second.port;
*((uint32_t*)(sentPInfo.data + 8)) = iter->second.lseq - 1;
iter->second.sentPkts.push_back(std::move(pInfo)); iter->second.sentPkts.push_back(std::move(pInfo));
while(iter->second.sentPkts.size() > UDPC_SENT_PKTS_MAX_SIZE) { iter->second.cleanupSentPkts();
iter->second.sentPkts.pop_front();
}
} }
// store other pkt info
UDPC::SentPktInfo::Ptr sentPktInfo = std::make_shared<UDPC::SentPktInfo>();
sentPktInfo->id = iter->second.lseq - 1;
iter->second.sentInfoMap.insert(std::make_pair(sentPktInfo->id, sentPktInfo));
} }
} }
@ -541,6 +564,30 @@ void UDPC_update(void *ctx) {
// packet is valid // packet is valid
// TODO log received valid packet // TODO log received valid packet
// update rtt
for(auto sentIter = iter->second.sentPkts.rbegin(); sentIter != iter->second.sentPkts.rend(); ++sentIter) {
uint32_t id = *((uint32_t*)(sentIter->data + 8));
if(id == rseq) {
auto sentInfoIter = iter->second.sentInfoMap.find(id);
assert(sentInfoIter != iter->second.sentInfoMap.end()
&& "sentInfoMap should have known stored id");
float diff = UDPC::durationToFSec(sentInfoIter->second->sentTime, now);
if(diff > iter->second.rtt) {
iter->second.rtt += (diff - iter->second.rtt) / 10.0f;
} else {
iter->second.rtt -= (iter->second.rtt - diff) / 10.0f;
}
iter->second.flags.set(2, iter->second.rtt <= UDPC_GOOD_RTT_LIMIT_SEC);
// TODO verbose log rtt
break;
}
}
iter->second.received = now;
// TODO impl check pkt timeout
// TODO impl // TODO impl
} }

View file

@ -47,6 +47,7 @@ extern "C" {
typedef enum { SILENT, ERROR, WARNING, VERBOSE, INFO } UDPC_LoggingType; typedef enum { SILENT, ERROR, WARNING, VERBOSE, INFO } UDPC_LoggingType;
typedef struct { typedef struct {
// id is stored at offset 8, size 4 (uint32_t) even for "empty" PktInfos
char data[UDPC_PACKET_MAX_SIZE]; char data[UDPC_PACKET_MAX_SIZE];
/* /*
* 0x1 - connect * 0x1 - connect