Impl graceful shutdown of backend on failure
This commit is contained in:
parent
13f500b4f5
commit
6c47ce948b
5 changed files with 142 additions and 49 deletions
48
back_end/Cargo.lock
generated
48
back_end/Cargo.lock
generated
|
@ -157,6 +157,7 @@ name = "four_line_dropper_backend"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"futures",
|
||||||
"rand",
|
"rand",
|
||||||
"rusqlite",
|
"rusqlite",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
@ -164,6 +165,21 @@ dependencies = [
|
||||||
"warp",
|
"warp",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures"
|
||||||
|
version = "0.3.21"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
|
||||||
|
dependencies = [
|
||||||
|
"futures-channel",
|
||||||
|
"futures-core",
|
||||||
|
"futures-executor",
|
||||||
|
"futures-io",
|
||||||
|
"futures-sink",
|
||||||
|
"futures-task",
|
||||||
|
"futures-util",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-channel"
|
name = "futures-channel"
|
||||||
version = "0.3.21"
|
version = "0.3.21"
|
||||||
|
@ -180,6 +196,34 @@ version = "0.3.21"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
|
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-executor"
|
||||||
|
version = "0.3.21"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6"
|
||||||
|
dependencies = [
|
||||||
|
"futures-core",
|
||||||
|
"futures-task",
|
||||||
|
"futures-util",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-io"
|
||||||
|
version = "0.3.21"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-macro"
|
||||||
|
version = "0.3.21"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-sink"
|
name = "futures-sink"
|
||||||
version = "0.3.21"
|
version = "0.3.21"
|
||||||
|
@ -198,9 +242,13 @@ version = "0.3.21"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
|
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"futures-channel",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
"futures-io",
|
||||||
|
"futures-macro",
|
||||||
"futures-sink",
|
"futures-sink",
|
||||||
"futures-task",
|
"futures-task",
|
||||||
|
"memchr",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"pin-utils",
|
"pin-utils",
|
||||||
"slab",
|
"slab",
|
||||||
|
|
|
@ -12,3 +12,4 @@ serde_json = "1.0"
|
||||||
bytes = "1.1"
|
bytes = "1.1"
|
||||||
rusqlite = "0.27.0"
|
rusqlite = "0.27.0"
|
||||||
rand = "0.8.4"
|
rand = "0.8.4"
|
||||||
|
futures = "0.3"
|
||||||
|
|
|
@ -13,7 +13,7 @@ enum DBFirstRun {
|
||||||
fn init_conn(sqlite_path: &str, first_run: DBFirstRun) -> Result<Connection, String> {
|
fn init_conn(sqlite_path: &str, first_run: DBFirstRun) -> Result<Connection, String> {
|
||||||
if let Ok(conn) = Connection::open(sqlite_path) {
|
if let Ok(conn) = Connection::open(sqlite_path) {
|
||||||
conn.execute("PRAGMA foreign_keys = ON;", [])
|
conn.execute("PRAGMA foreign_keys = ON;", [])
|
||||||
.expect("Should be able to enable \"foreign_keys\"");
|
.map_err(|e| format!("Should be able to handle \"foreign_keys\": {:?}", e))?;
|
||||||
let result = conn.execute(
|
let result = conn.execute(
|
||||||
"
|
"
|
||||||
CREATE TABLE players (id INTEGER PRIMARY KEY NOT NULL,
|
CREATE TABLE players (id INTEGER PRIMARY KEY NOT NULL,
|
||||||
|
@ -53,33 +53,50 @@ fn init_conn(sqlite_path: &str, first_run: DBFirstRun) -> Result<Connection, Str
|
||||||
}
|
}
|
||||||
Ok(conn)
|
Ok(conn)
|
||||||
} else {
|
} else {
|
||||||
Err(String::from("Failed to get connection"))
|
Err(String::from("Failed to open connection"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_db_handler_thread(rx: Receiver<SyncSender<u32>>, sqlite_path: String) {
|
pub fn start_db_handler_thread(
|
||||||
|
rx: Receiver<SyncSender<u32>>,
|
||||||
|
sqlite_path: String,
|
||||||
|
shutdown_tx: SyncSender<()>,
|
||||||
|
) {
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
// temporarily get conn which should initialize on first setup of db
|
// temporarily get conn which should initialize on first setup of db
|
||||||
if let Ok(_conn) = init_conn(&sqlite_path, DBFirstRun::FirstRun) {
|
if let Ok(_conn) = init_conn(&sqlite_path, DBFirstRun::FirstRun) {
|
||||||
} else {
|
} else {
|
||||||
println!("ERROR: Failed init sqlite db connection");
|
println!("ERROR: Failed init sqlite db connection");
|
||||||
|
shutdown_tx.send(()).ok();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
'outer: loop {
|
||||||
let result = rx.recv();
|
let rx_recv_result = rx.recv();
|
||||||
//println!("db_handler: Got result from rx");
|
if let Err(e) = rx_recv_result {
|
||||||
|
println!("Failed to get player_tx: {:?}", e);
|
||||||
|
shutdown_tx.send(()).ok();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let player_tx = rx_recv_result.unwrap();
|
||||||
|
|
||||||
if let Ok(player_tx) = result {
|
|
||||||
//println!("db_handler: Got player_tx from rx");
|
|
||||||
// got request to create new player, create new player
|
// got request to create new player, create new player
|
||||||
let mut player_id: u32 = thread_rng().gen();
|
let mut player_id: u32 = thread_rng().gen();
|
||||||
let conn = init_conn(&sqlite_path, DBFirstRun::NotFirstRun)
|
let conn_result = init_conn(&sqlite_path, DBFirstRun::NotFirstRun);
|
||||||
.expect("DB connection should be available");
|
if let Err(e) = conn_result {
|
||||||
|
println!("Failed to get sqlite db connection: {:?}", e);
|
||||||
|
shutdown_tx.send(()).ok();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let conn = conn_result.unwrap();
|
||||||
loop {
|
loop {
|
||||||
let mut stmt = conn
|
let stmt_result = conn.prepare("SELECT id FROM players WHERE id = ?;");
|
||||||
.prepare("SELECT id FROM players WHERE id = ?;")
|
if let Err(e) = stmt_result {
|
||||||
.expect("Should be able to prepare DB statement");
|
println!("Failed to create sqlite statement: {:?}", e);
|
||||||
|
shutdown_tx.send(()).ok();
|
||||||
|
break 'outer;
|
||||||
|
}
|
||||||
|
let mut stmt = stmt_result.unwrap();
|
||||||
match stmt.query_row([player_id], |_row| Ok(())) {
|
match stmt.query_row([player_id], |_row| Ok(())) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
player_id = thread_rng().gen();
|
player_id = thread_rng().gen();
|
||||||
|
@ -87,19 +104,23 @@ pub fn start_db_handler_thread(rx: Receiver<SyncSender<u32>>, sqlite_path: Strin
|
||||||
Err(_) => break,
|
Err(_) => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conn.execute(
|
let insert_result = conn.execute(
|
||||||
"INSERT INTO players (id, date_added) VALUES (?, datetime());",
|
"INSERT INTO players (id, date_added) VALUES (?, datetime());",
|
||||||
[player_id],
|
[player_id],
|
||||||
)
|
);
|
||||||
.unwrap_or_else(|_| {
|
if let Err(e) = insert_result {
|
||||||
panic!("Should be able to insert new player with id {}", player_id)
|
println!("Failed to insert into sqlite db: {:?}", e);
|
||||||
});
|
shutdown_tx.send(()).ok();
|
||||||
player_tx
|
break 'outer;
|
||||||
.send(player_id)
|
|
||||||
.expect("Should be able to send back valid player id");
|
|
||||||
} else {
|
|
||||||
println!("db_handler: Failed to get player_tx");
|
|
||||||
}
|
}
|
||||||
|
let send_result = player_tx.send(player_id);
|
||||||
|
if let Err(e) = send_result {
|
||||||
|
println!("Failed to send back player id: {:?}", e);
|
||||||
|
shutdown_tx.send(()).ok();
|
||||||
|
break 'outer;
|
||||||
|
}
|
||||||
|
send_result.unwrap();
|
||||||
|
|
||||||
// Pair up players
|
// Pair up players
|
||||||
// TODO
|
// TODO
|
||||||
} // loop end
|
} // loop end
|
||||||
|
|
|
@ -1,15 +1,18 @@
|
||||||
use std::{
|
use std::{
|
||||||
sync::mpsc::{sync_channel, Receiver, SyncSender},
|
sync::mpsc::{sync_channel, SyncSender},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
pub fn handle_json(root: Value, tx: SyncSender<SyncSender<u32>>) -> Result<String, String> {
|
pub fn handle_json(
|
||||||
|
root: Value,
|
||||||
|
tx: SyncSender<SyncSender<u32>>,
|
||||||
|
_shutdown_tx: SyncSender<()>, // maybe used here, not sure if it will be
|
||||||
|
) -> Result<String, String> {
|
||||||
if let Some(Value::String(type_str)) = root.get("type") {
|
if let Some(Value::String(type_str)) = root.get("type") {
|
||||||
let (player_tx, player_rx) = sync_channel::<u32>(8);
|
|
||||||
match type_str.as_str() {
|
match type_str.as_str() {
|
||||||
"pairing_request" => handle_pairing_request(tx, player_tx, player_rx),
|
"pairing_request" => handle_pairing_request(tx),
|
||||||
"check_pairing" => handle_check_pairing(root),
|
"check_pairing" => handle_check_pairing(root),
|
||||||
"place_token" => handle_place_token(root),
|
"place_token" => handle_place_token(root),
|
||||||
"disconnect" => handle_disconnect(root),
|
"disconnect" => handle_disconnect(root),
|
||||||
|
@ -21,11 +24,8 @@ pub fn handle_json(root: Value, tx: SyncSender<SyncSender<u32>>) -> Result<Strin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_pairing_request(
|
fn handle_pairing_request(tx: SyncSender<SyncSender<u32>>) -> Result<String, String> {
|
||||||
tx: SyncSender<SyncSender<u32>>,
|
let (player_tx, player_rx) = sync_channel::<u32>(1);
|
||||||
player_tx: SyncSender<u32>,
|
|
||||||
player_rx: Receiver<u32>,
|
|
||||||
) -> Result<String, String> {
|
|
||||||
if tx.send(player_tx).is_err() {
|
if tx.send(player_tx).is_err() {
|
||||||
return Err("{\"type\":\"pairing_response\", \"status\":\"internal_error\"}".into());
|
return Err("{\"type\":\"pairing_response\", \"status\":\"internal_error\"}".into());
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ const SQLITE_DB_PATH: &str = "./fourLineDropper.db";
|
||||||
use std::sync::mpsc::{sync_channel, SyncSender};
|
use std::sync::mpsc::{sync_channel, SyncSender};
|
||||||
|
|
||||||
use db_handler::start_db_handler_thread;
|
use db_handler::start_db_handler_thread;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
use warp::{Filter, Rejection};
|
use warp::{Filter, Rejection};
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
@ -13,19 +14,36 @@ async fn main() {
|
||||||
let (db_tx, db_rx) = sync_channel::<SyncSender<u32>>(32);
|
let (db_tx, db_rx) = sync_channel::<SyncSender<u32>>(32);
|
||||||
let db_tx_clone = db_tx.clone();
|
let db_tx_clone = db_tx.clone();
|
||||||
|
|
||||||
start_db_handler_thread(db_rx, SQLITE_DB_PATH.into());
|
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
|
||||||
|
|
||||||
|
// Required because shutdown_tx is not cloneable, and its "send" consumes
|
||||||
|
// itself.
|
||||||
|
let (s_helper_tx, s_helper_rx) = sync_channel::<()>(1);
|
||||||
|
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
if let Ok(_unused_value) = s_helper_rx.recv() {
|
||||||
|
shutdown_tx
|
||||||
|
.send(())
|
||||||
|
.expect("Should be able to send shutdown signal");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
start_db_handler_thread(db_rx, SQLITE_DB_PATH.into(), s_helper_tx.clone());
|
||||||
|
|
||||||
let route = warp::body::content_length_limit(1024 * 32)
|
let route = warp::body::content_length_limit(1024 * 32)
|
||||||
.and(warp::body::bytes())
|
.and(warp::body::bytes())
|
||||||
.and_then(move |bytes: bytes::Bytes| {
|
.and_then(move |bytes: bytes::Bytes| {
|
||||||
let db_tx_clone = db_tx_clone.clone();
|
let db_tx_clone = db_tx_clone.clone();
|
||||||
|
let s_helper_tx_clone = s_helper_tx.clone();
|
||||||
async move {
|
async move {
|
||||||
let body_str_result = std::str::from_utf8(bytes.as_ref());
|
let body_str_result = std::str::from_utf8(bytes.as_ref());
|
||||||
if let Ok(body_str) = body_str_result {
|
if let Ok(body_str) = body_str_result {
|
||||||
let json_result = serde_json::from_str(body_str);
|
let json_result = serde_json::from_str(body_str);
|
||||||
if let Ok(json_value) = json_result {
|
if let Ok(json_value) = json_result {
|
||||||
Ok(json_handlers::handle_json(json_value, db_tx_clone)
|
Ok(
|
||||||
.unwrap_or_else(|e| e))
|
json_handlers::handle_json(json_value, db_tx_clone, s_helper_tx_clone)
|
||||||
|
.unwrap_or_else(|e| e),
|
||||||
|
)
|
||||||
} else {
|
} else {
|
||||||
Ok(String::from("{\"type\": \"invalid_syntax\"}"))
|
Ok(String::from("{\"type\": \"invalid_syntax\"}"))
|
||||||
}
|
}
|
||||||
|
@ -35,5 +53,10 @@ async fn main() {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
warp::serve(route).run(([0, 0, 0, 0], 1237)).await;
|
let (_addr, server) =
|
||||||
|
warp::serve(route).bind_with_graceful_shutdown(([0, 0, 0, 0], 1237), async move {
|
||||||
|
shutdown_rx.await.ok();
|
||||||
|
});
|
||||||
|
|
||||||
|
tokio::task::spawn(server).await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue