version = "0.1.0"
dependencies = [
"bytes",
+ "futures",
"rand",
"rusqlite",
"serde_json",
"warp",
]
+[[package]]
+name = "futures"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
[[package]]
name = "futures-channel"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
+[[package]]
+name = "futures-executor"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "futures-sink"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
dependencies = [
+ "futures-channel",
"futures-core",
+ "futures-io",
+ "futures-macro",
"futures-sink",
"futures-task",
+ "memchr",
"pin-project-lite",
"pin-utils",
"slab",
bytes = "1.1"
rusqlite = "0.27.0"
rand = "0.8.4"
+futures = "0.3"
fn init_conn(sqlite_path: &str, first_run: DBFirstRun) -> Result<Connection, String> {
if let Ok(conn) = Connection::open(sqlite_path) {
conn.execute("PRAGMA foreign_keys = ON;", [])
- .expect("Should be able to enable \"foreign_keys\"");
+ .map_err(|e| format!("Should be able to handle \"foreign_keys\": {:?}", e))?;
let result = conn.execute(
"
CREATE TABLE players (id INTEGER PRIMARY KEY NOT NULL,
}
Ok(conn)
} else {
- Err(String::from("Failed to get connection"))
+ Err(String::from("Failed to open connection"))
}
}
-pub fn start_db_handler_thread(rx: Receiver<SyncSender<u32>>, sqlite_path: String) {
+pub fn start_db_handler_thread(
+ rx: Receiver<SyncSender<u32>>,
+ sqlite_path: String,
+ shutdown_tx: SyncSender<()>,
+) {
thread::spawn(move || {
// temporarily get conn which should initialize on first setup of db
if let Ok(_conn) = init_conn(&sqlite_path, DBFirstRun::FirstRun) {
} else {
println!("ERROR: Failed init sqlite db connection");
+ shutdown_tx.send(()).ok();
return;
}
- loop {
- let result = rx.recv();
- //println!("db_handler: Got result from rx");
+ 'outer: loop {
+ let rx_recv_result = rx.recv();
+ if let Err(e) = rx_recv_result {
+ println!("Failed to get player_tx: {:?}", e);
+ shutdown_tx.send(()).ok();
+ break;
+ }
+ let player_tx = rx_recv_result.unwrap();
- if let Ok(player_tx) = result {
- //println!("db_handler: Got player_tx from rx");
- // got request to create new player, create new player
- let mut player_id: u32 = thread_rng().gen();
- let conn = init_conn(&sqlite_path, DBFirstRun::NotFirstRun)
- .expect("DB connection should be available");
- loop {
- let mut stmt = conn
- .prepare("SELECT id FROM players WHERE id = ?;")
- .expect("Should be able to prepare DB statement");
- match stmt.query_row([player_id], |_row| Ok(())) {
- Ok(_) => {
- player_id = thread_rng().gen();
- }
- Err(_) => break,
+ // got request to create new player, create new player
+ let mut player_id: u32 = thread_rng().gen();
+ let conn_result = init_conn(&sqlite_path, DBFirstRun::NotFirstRun);
+ if let Err(e) = conn_result {
+ println!("Failed to get sqlite db connection: {:?}", e);
+ shutdown_tx.send(()).ok();
+ break;
+ }
+ let conn = conn_result.unwrap();
+ loop {
+ let stmt_result = conn.prepare("SELECT id FROM players WHERE id = ?;");
+ if let Err(e) = stmt_result {
+ println!("Failed to create sqlite statement: {:?}", e);
+ shutdown_tx.send(()).ok();
+ break 'outer;
+ }
+ let mut stmt = stmt_result.unwrap();
+ match stmt.query_row([player_id], |_row| Ok(())) {
+ Ok(_) => {
+ player_id = thread_rng().gen();
}
+ Err(_) => break,
}
- conn.execute(
- "INSERT INTO players (id, date_added) VALUES (?, datetime());",
- [player_id],
- )
- .unwrap_or_else(|_| {
- panic!("Should be able to insert new player with id {}", player_id)
- });
- player_tx
- .send(player_id)
- .expect("Should be able to send back valid player id");
- } else {
- println!("db_handler: Failed to get player_tx");
}
+ let insert_result = conn.execute(
+ "INSERT INTO players (id, date_added) VALUES (?, datetime());",
+ [player_id],
+ );
+ if let Err(e) = insert_result {
+ println!("Failed to insert into sqlite db: {:?}", e);
+ shutdown_tx.send(()).ok();
+ break 'outer;
+ }
+ let send_result = player_tx.send(player_id);
+ if let Err(e) = send_result {
+ println!("Failed to send back player id: {:?}", e);
+ shutdown_tx.send(()).ok();
+ break 'outer;
+ }
+ send_result.unwrap();
+
// Pair up players
// TODO
} // loop end
use std::{
- sync::mpsc::{sync_channel, Receiver, SyncSender},
+ sync::mpsc::{sync_channel, SyncSender},
time::Duration,
};
use serde_json::Value;
-pub fn handle_json(root: Value, tx: SyncSender<SyncSender<u32>>) -> Result<String, String> {
+pub fn handle_json(
+ root: Value,
+ tx: SyncSender<SyncSender<u32>>,
+ _shutdown_tx: SyncSender<()>, // maybe used here, not sure if it will be
+) -> Result<String, String> {
if let Some(Value::String(type_str)) = root.get("type") {
- let (player_tx, player_rx) = sync_channel::<u32>(8);
match type_str.as_str() {
- "pairing_request" => handle_pairing_request(tx, player_tx, player_rx),
+ "pairing_request" => handle_pairing_request(tx),
"check_pairing" => handle_check_pairing(root),
"place_token" => handle_place_token(root),
"disconnect" => handle_disconnect(root),
}
}
-fn handle_pairing_request(
- tx: SyncSender<SyncSender<u32>>,
- player_tx: SyncSender<u32>,
- player_rx: Receiver<u32>,
-) -> Result<String, String> {
+fn handle_pairing_request(tx: SyncSender<SyncSender<u32>>) -> Result<String, String> {
+ let (player_tx, player_rx) = sync_channel::<u32>(1);
if tx.send(player_tx).is_err() {
return Err("{\"type\":\"pairing_response\", \"status\":\"internal_error\"}".into());
}
use std::sync::mpsc::{sync_channel, SyncSender};
use db_handler::start_db_handler_thread;
+use tokio::sync::oneshot;
use warp::{Filter, Rejection};
#[tokio::main]
let (db_tx, db_rx) = sync_channel::<SyncSender<u32>>(32);
let db_tx_clone = db_tx.clone();
- start_db_handler_thread(db_rx, SQLITE_DB_PATH.into());
+ let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
+
+ // Required because shutdown_tx is not cloneable, and its "send" consumes
+ // itself.
+ let (s_helper_tx, s_helper_rx) = sync_channel::<()>(1);
+
+ std::thread::spawn(move || {
+ if let Ok(_unused_value) = s_helper_rx.recv() {
+ shutdown_tx
+ .send(())
+ .expect("Should be able to send shutdown signal");
+ }
+ });
+
+ start_db_handler_thread(db_rx, SQLITE_DB_PATH.into(), s_helper_tx.clone());
let route = warp::body::content_length_limit(1024 * 32)
.and(warp::body::bytes())
.and_then(move |bytes: bytes::Bytes| {
let db_tx_clone = db_tx_clone.clone();
+ let s_helper_tx_clone = s_helper_tx.clone();
async move {
let body_str_result = std::str::from_utf8(bytes.as_ref());
if let Ok(body_str) = body_str_result {
let json_result = serde_json::from_str(body_str);
if let Ok(json_value) = json_result {
- Ok(json_handlers::handle_json(json_value, db_tx_clone)
- .unwrap_or_else(|e| e))
+ Ok(
+ json_handlers::handle_json(json_value, db_tx_clone, s_helper_tx_clone)
+ .unwrap_or_else(|e| e),
+ )
} else {
Ok(String::from("{\"type\": \"invalid_syntax\"}"))
}
}
});
- warp::serve(route).run(([0, 0, 0, 0], 1237)).await;
+ let (_addr, server) =
+ warp::serve(route).bind_with_graceful_shutdown(([0, 0, 0, 0], 1237), async move {
+ shutdown_rx.await.ok();
+ });
+
+ tokio::task::spawn(server).await.unwrap();
}