Some more work on cpp_impl (still WIP)

This commit is contained in:
Stephen Seo 2019-07-25 20:51:08 +09:00
parent 0bd51418a2
commit 5c4360cabe
4 changed files with 293 additions and 27 deletions

View file

@ -28,6 +28,7 @@ class TSQueue {
void clear(); void clear();
void changeCapacity(unsigned int newCapacity); void changeCapacity(unsigned int newCapacity);
unsigned int size(); unsigned int size();
bool empty();
private: private:
std::atomic_bool spinLock; std::atomic_bool spinLock;
@ -100,4 +101,11 @@ unsigned int TSQueue<T>::size() {
return size; return size;
} }
template <typename T>
bool TSQueue<T>::empty() {
// No lock required, since this is calling size() that uses a lock
unsigned int size = this->size();
return size == 0;
}
#endif #endif

View file

@ -3,25 +3,37 @@
#define UDPC_CONTEXT_IDENTIFIER 0x902F4DB3 #define UDPC_CONTEXT_IDENTIFIER 0x902F4DB3
#define UDPC_TIMEOUT_SECONDS 10.0f #define UDPC_TIMEOUT_SECONDS 10.0f
#define UDPC_GOOD_MODE_SEND_INTERVAL (1.0f / 30.0f)
#define UDPC_BAD_MODE_SEND_INTERVAL (1.0f / 10.0f)
#define UDPC_SENT_PKTS_MAX_SIZE 33
#define UDPC_ID_CONNECT 0x80000000
#define UDPC_ID_PING 0x40000000
#define UDPC_ID_NO_REC_CHK 0x20000000
#define UDPC_ID_RESENDING 0x10000000
#include <atomic> #include <atomic>
#include <bitset> #include <bitset>
#include <chrono>
#include <cstdint> #include <cstdint>
#include <deque> #include <deque>
#include <chrono>
#include <unordered_map> #include <unordered_map>
#include "UDPConnection.h"
#include "TSQueue.hpp" #include "TSQueue.hpp"
#include "UDPConnection.h"
namespace UDPC { namespace UDPC {
static uint32_t LOCAL_ADDR = 0;
static const auto INIT_PKT_INTERVAL_DT = std::chrono::seconds(5);
static const auto HEARTBEAT_PKT_INTERVAL_DT = std::chrono::milliseconds(150);
struct ConnectionData { struct ConnectionData {
/* /*
* 0 - trigger send * 0 - trigger send
* 1 - is good mode * 1 - is good mode
* 2 - is good rtt * 2 - is good rtt
* 3 - initiating connection to server * 3 - initiating connection
* 4 - is id set * 4 - is id set
*/ */
std::bitset<32> flags; std::bitset<32> flags;
@ -35,9 +47,9 @@ struct ConnectionData {
float toggledTimer; float toggledTimer;
uint32_t addr; // in network order uint32_t addr; // in network order
uint16_t port; uint16_t port;
std::deque<PacketInfo> sentPkts; std::deque<UDPC_PacketInfo> sentPkts;
TSQueue<PacketInfo> sendPkts; TSQueue<UDPC_PacketInfo> sendPkts;
TSQueue<PacketInfo> priorityPkts; TSQueue<UDPC_PacketInfo> priorityPkts;
std::chrono::steady_clock::time_point received; std::chrono::steady_clock::time_point received;
std::chrono::steady_clock::time_point sent; std::chrono::steady_clock::time_point sent;
float rtt; float rtt;
@ -48,7 +60,8 @@ struct Context {
uint_fast32_t _contextIdentifier; uint_fast32_t _contextIdentifier;
/* /*
* 0 - isThreaded * 0 - is threaded
* 1 - is client
*/ */
std::bitset<32> flags; std::bitset<32> flags;
std::atomic_bool isAcceptNewConnections; std::atomic_bool isAcceptNewConnections;
@ -61,12 +74,24 @@ struct Context {
std::chrono::steady_clock::time_point lastUpdated; std::chrono::steady_clock::time_point lastUpdated;
std::unordered_map<uint32_t, ConnectionData> conMap; std::unordered_map<uint32_t, ConnectionData> conMap;
std::unordered_map<uint32_t, uint32_t> idMap;
}; // struct Context }; // struct Context
Context *verifyContext(void *ctx); Context *verifyContext(void *ctx);
bool isBigEndian(); bool isBigEndian();
/*
* flags:
* - 0x1 - connect
* - 0x2 - ping
* - 0x4 - no_rec_chk
* - 0x8 - resending
*/
void preparePacket(char *data, uint32_t protocolID, uint32_t conID,
uint32_t rseq, uint32_t ack, uint32_t *seqID, int flags);
} // namespace UDPC } // namespace UDPC
#endif #endif

View file

@ -1,7 +1,9 @@
#include "UDPC_Defines.hpp" #include "UDPC_Defines.hpp"
#include "UDPConnection.h" #include "UDPConnection.h"
#include <cassert>
#include <chrono> #include <chrono>
#include <cstring>
#include <optional> #include <optional>
#include <vector> #include <vector>
@ -19,6 +21,14 @@ UDPC::Context::Context(bool isThreaded)
} else { } else {
flags.reset(0); flags.reset(0);
} }
if(UDPC::LOCAL_ADDR == 0) {
if(UDPC::isBigEndian()) {
UDPC::LOCAL_ADDR = 0x7F000001;
} else {
UDPC::LOCAL_ADDR = 0x0100007F;
}
}
} }
UDPC::Context *UDPC::verifyContext(void *ctx) { UDPC::Context *UDPC::verifyContext(void *ctx) {
@ -47,6 +57,33 @@ bool UDPC::isBigEndian() {
return *isBigEndian; return *isBigEndian;
} }
void UDPC::preparePacket(char *data, uint32_t protocolID, uint32_t conID,
uint32_t rseq, uint32_t ack, uint32_t *seqID,
int flags) {
uint32_t temp;
temp = htonl(protocolID);
std::memcpy(data, &temp, 4);
temp = htonl(conID | ((flags & 0x1) != 0 ? UDPC_ID_CONNECT : 0) |
((flags & 0x2) != 0 ? UDPC_ID_PING : 0) |
((flags & 0x4) != 0 ? UDPC_ID_NO_REC_CHK : 0) |
((flags & 0x8) != 0 ? UDPC_ID_RESENDING : 0));
std::memcpy(data + 4, &temp, 4);
if(seqID) {
temp = htonl(*seqID);
++(*seqID);
} else {
temp = 0;
}
std::memcpy(data + 8, &temp, 4);
temp = htonl(rseq);
std::memcpy(data + 12, &temp, 4);
temp = htonl(ack);
std::memcpy(data + 16, &temp, 4);
}
void *UDPC_init(uint16_t listenPort, uint32_t listenAddr, int isClient) { void *UDPC_init(uint16_t listenPort, uint32_t listenAddr, int isClient) {
UDPC::Context *ctx = new UDPC::Context(false); UDPC::Context *ctx = new UDPC::Context(false);
@ -71,6 +108,15 @@ void *UDPC_init(uint16_t listenPort, uint32_t listenAddr, int isClient) {
return nullptr; return nullptr;
} }
// TODO verify this is necessary to get the listen port
if(ctx->socketInfo.sin_port == 0) {
struct sockaddr_in getInfo;
socklen_t size = sizeof(struct sockaddr_in);
if(getsockname(ctx->socketHandle, (struct sockaddr *)&getInfo, &size) == 0) {
ctx->socketInfo.sin_port = getInfo.sin_port;
}
}
// set non-blocking on socket // set non-blocking on socket
#if UDPC_PLATFORM == UDPC_PLATFORM_MAC || UDPC_PLATFORM == UDPC_PLATFORM_LINUX #if UDPC_PLATFORM == UDPC_PLATFORM_MAC || UDPC_PLATFORM == UDPC_PLATFORM_LINUX
int nonblocking = 1; int nonblocking = 1;
@ -123,6 +169,8 @@ void UDPC_update(void *ctx) {
std::chrono::steady_clock::duration temp_dt; std::chrono::steady_clock::duration temp_dt;
float temp_dt_fs; float temp_dt_fs;
{
// check timed out, check good/bad mode with rtt, remove timed out
std::vector<uint32_t> removed; std::vector<uint32_t> removed;
for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) { for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) {
temp_dt = now - iter->second.received; temp_dt = now - iter->second.received;
@ -131,6 +179,7 @@ void UDPC_update(void *ctx) {
(float)decltype(temp_dt)::period::den; (float)decltype(temp_dt)::period::den;
if(temp_dt_fs >= UDPC_TIMEOUT_SECONDS) { if(temp_dt_fs >= UDPC_TIMEOUT_SECONDS) {
removed.push_back(iter->first); removed.push_back(iter->first);
continue;
// TODO log timed out connection // TODO log timed out connection
} }
@ -139,7 +188,184 @@ void UDPC_update(void *ctx) {
iter->second.toggledTimer += temp_dt_fs; iter->second.toggledTimer += temp_dt_fs;
if(iter->second.flags.test(1) && !iter->second.flags.test(2)) { if(iter->second.flags.test(1) && !iter->second.flags.test(2)) {
// good mode, bad rtt // good mode, bad rtt
// TODO // TODO log switching to bad mode
iter->second.flags.reset(1);
if(iter->second.toggledTimer <= 10.0f) {
iter->second.toggleT *= 2.0f;
}
iter->second.toggledTimer = 0.0f;
} else if(iter->second.flags.test(1)) {
// good mode, good rtt
if(iter->second.toggleTimer >= 10.0f) {
iter->second.toggleTimer = 0.0f;
iter->second.toggleT /= 2.0f;
if(iter->second.toggleT < 1.0f) {
iter->second.toggleT = 1.0f;
}
}
} else if(!iter->second.flags.test(1) &&
iter->second.flags.test(2)) {
// bad mode, good rtt
if(iter->second.toggledTimer >= iter->second.toggleT) {
iter->second.toggleTimer = 0.0f;
iter->second.toggledTimer = 0.0f;
// TODO log switching to good mode
iter->second.flags.set(1);
}
} else {
// bad mode, bad rtt
iter->second.toggledTimer = 0.0f;
}
iter->second.timer += temp_dt_fs;
if(iter->second.timer >= (iter->second.flags.test(1)
? UDPC_GOOD_MODE_SEND_INTERVAL
: UDPC_BAD_MODE_SEND_INTERVAL)) {
iter->second.timer = 0.0f;
iter->second.flags.set(0);
}
}
for(auto iter = removed.begin(); iter != removed.end(); ++iter) {
auto cIter = c->conMap.find(*iter);
assert(cIter != c->conMap.end());
if(cIter->second.flags.test(4)) {
c->idMap.erase(cIter->second.id);
}
c->conMap.erase(cIter);
}
}
// update send (only if triggerSend flag is set)
for(auto iter = c->conMap.begin(); iter != c->conMap.end(); ++iter) {
if(!iter->second.flags.test(0)) {
continue;
}
iter->second.flags.reset(0);
if(iter->second.flags.test(3)) {
if(c->flags.test(1)) {
// is initiating connection to server
auto initDT = now - iter->second.sent;
if(initDT < UDPC::INIT_PKT_INTERVAL_DT) {
continue;
}
iter->second.sent = now;
std::unique_ptr<char[]> buf = std::make_unique<char[]>(20);
UDPC::preparePacket(
buf.get(),
c->protocolID,
0,
0,
0xFFFFFFFF,
nullptr,
0x1);
struct sockaddr_in destinationInfo;
destinationInfo.sin_family = AF_INET;
destinationInfo.sin_addr.s_addr = iter->first;
destinationInfo.sin_port = htons(iter->second.port);
long int sentBytes = sendto(
c->socketHandle,
buf.get(),
20,
0,
(struct sockaddr*) &destinationInfo,
sizeof(struct sockaddr_in));
if(sentBytes != 20) {
// TODO log fail of sending connection-initiate-packet
}
} else {
// is server, initiate connection to client
iter->second.flags.reset(3);
iter->second.sent = now;
std::unique_ptr<char[]> buf = std::make_unique<char[]>(20);
UDPC::preparePacket(
buf.get(),
c->protocolID,
iter->second.id,
iter->second.rseq,
iter->second.ack,
&iter->second.lseq,
0x1);
struct sockaddr_in destinationInfo;
destinationInfo.sin_family = AF_INET;
destinationInfo.sin_addr.s_addr = iter->first;
destinationInfo.sin_port = htons(iter->second.port);
long int sentBytes = sendto(
c->socketHandle,
buf.get(),
20,
0,
(struct sockaddr*) &destinationInfo,
sizeof(struct sockaddr_in));
if(sentBytes != 20) {
// TODO log fail send init connection packet as server
}
}
continue;
}
// Not initiating connection, send as normal on current connection
if(iter->second.sendPkts.empty() && iter->second.priorityPkts.empty()) {
// nothing in queues, send heartbeat packet
auto sentDT = now - iter->second.sent;
if(sentDT < UDPC::HEARTBEAT_PKT_INTERVAL_DT) {
continue;
}
std::unique_ptr<char[]> buf = std::make_unique<char[]>(20);
UDPC::preparePacket(
buf.get(),
c->protocolID,
iter->second.id,
iter->second.rseq,
iter->second.ack,
&iter->second.lseq,
0);
struct sockaddr_in destinationInfo;
destinationInfo.sin_family = AF_INET;
destinationInfo.sin_addr.s_addr = iter->first;
destinationInfo.sin_port = htons(iter->second.port);
long int sentBytes = sendto(
c->socketHandle,
buf.get(),
20,
0,
(struct sockaddr*) &destinationInfo,
sizeof(struct sockaddr_in));
if(sentBytes != 20) {
// TODO log fail send heartbeat packet
}
UDPC_PacketInfo pInfo{{0}, 0, 0, 0, 0, 0, 0};
pInfo.sender = UDPC::LOCAL_ADDR;
pInfo.receiver = iter->first;
pInfo.senderPort = c->socketInfo.sin_port;
pInfo.receiverPort = iter->second.port;
iter->second.sentPkts.push_back(std::move(pInfo));
while(iter->second.sentPkts.size() > UDPC_SENT_PKTS_MAX_SIZE) {
iter->second.sentPkts.pop_front();
}
} else {
// sendPkts or priorityPkts not empty
UDPC_PacketInfo pInfo;
bool isResending = false;
if(!iter->second.priorityPkts.empty()) {
// TODO verify getting struct copy is valid
pInfo = iter->second.priorityPkts.top();
iter->second.priorityPkts.pop();
isResending = true;
} else {
pInfo = iter->second.sendPkts.top();
iter->second.sendPkts.pop();
}
std::unique_ptr<char[]> buf = std::make_unique<char[]>(20 + pInfo.dataSize);
// TODO prepare and send packet
} }
} }
@ -197,13 +423,13 @@ UDPC_LoggingType set_logging_type(void *ctx, UDPC_LoggingType loggingType) {
return static_cast<UDPC_LoggingType>(c->loggingType.exchange(loggingType)); return static_cast<UDPC_LoggingType>(c->loggingType.exchange(loggingType));
} }
PacketInfo UDPC_get_received(void *ctx) { UDPC_PacketInfo UDPC_get_received(void *ctx) {
UDPC::Context *c = UDPC::verifyContext(ctx); UDPC::Context *c = UDPC::verifyContext(ctx);
if(!c) { if(!c) {
return PacketInfo{{0}, 0, 0, 0, 0, 0}; return UDPC_PacketInfo{{0}, 0, 0, 0, 0, 0, 0};
} }
// TODO impl // TODO impl
return PacketInfo{{0}, 0, 0, 0, 0, 0}; return UDPC_PacketInfo{{0}, 0, 0, 0, 0, 0, 0};
} }
const char *UDPC_atostr(void *ctx, uint32_t addr) { const char *UDPC_atostr(void *ctx, uint32_t addr) {

View file

@ -48,12 +48,19 @@ typedef enum { SILENT, ERROR, WARNING, VERBOSE, INFO } UDPC_LoggingType;
typedef struct { typedef struct {
char data[UDPC_PACKET_MAX_SIZE]; char data[UDPC_PACKET_MAX_SIZE];
/*
* 0x1 - connect
* 0x2 - ping
* 0x4 - no_rec_chk
* 0x8 - resending
*/
uint32_t flags;
uint16_t dataSize; // zero if invalid uint16_t dataSize; // zero if invalid
uint32_t sender; uint32_t sender;
uint32_t receiver; uint32_t receiver;
uint16_t senderPort; uint16_t senderPort;
uint16_t receiverPort; uint16_t receiverPort;
} PacketInfo; } UDPC_PacketInfo;
void *UDPC_init(uint16_t listenPort, uint32_t listenAddr, int isClient); void *UDPC_init(uint16_t listenPort, uint32_t listenAddr, int isClient);
void *UDPC_init_threaded_update(uint16_t listenPort, uint32_t listenAddr, void *UDPC_init_threaded_update(uint16_t listenPort, uint32_t listenAddr,
@ -76,7 +83,7 @@ uint32_t UDPC_set_protocol_id(void *ctx, uint32_t id);
UDPC_LoggingType set_logging_type(void *ctx, UDPC_LoggingType loggingType); UDPC_LoggingType set_logging_type(void *ctx, UDPC_LoggingType loggingType);
PacketInfo UDPC_get_received(void *ctx); UDPC_PacketInfo UDPC_get_received(void *ctx);
const char *UDPC_atostr(void *ctx, uint32_t addr); const char *UDPC_atostr(void *ctx, uint32_t addr);