From 6c47ce948b6b416b4870694b437d28502c34bbcb Mon Sep 17 00:00:00 2001 From: Stephen Seo Date: Fri, 18 Mar 2022 23:43:15 +0900 Subject: [PATCH] Impl graceful shutdown of backend on failure --- back_end/Cargo.lock | 48 ++++++++++++++++++ back_end/Cargo.toml | 1 + back_end/src/db_handler.rs | 93 +++++++++++++++++++++-------------- back_end/src/json_handlers.rs | 18 +++---- back_end/src/main.rs | 31 ++++++++++-- 5 files changed, 142 insertions(+), 49 deletions(-) diff --git a/back_end/Cargo.lock b/back_end/Cargo.lock index 200c91f..debb9b5 100644 --- a/back_end/Cargo.lock +++ b/back_end/Cargo.lock @@ -157,6 +157,7 @@ name = "four_line_dropper_backend" version = "0.1.0" dependencies = [ "bytes", + "futures", "rand", "rusqlite", "serde_json", @@ -164,6 +165,21 @@ dependencies = [ "warp", ] +[[package]] +name = "futures" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.21" @@ -180,6 +196,34 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +[[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" + +[[package]] +name = "futures-macro" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.21" @@ -198,9 +242,13 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", diff --git a/back_end/Cargo.toml b/back_end/Cargo.toml index 2149c40..c2804db 100644 --- a/back_end/Cargo.toml +++ b/back_end/Cargo.toml @@ -12,3 +12,4 @@ serde_json = "1.0" bytes = "1.1" rusqlite = "0.27.0" rand = "0.8.4" +futures = "0.3" diff --git a/back_end/src/db_handler.rs b/back_end/src/db_handler.rs index de42c2b..b432653 100644 --- a/back_end/src/db_handler.rs +++ b/back_end/src/db_handler.rs @@ -13,7 +13,7 @@ enum DBFirstRun { fn init_conn(sqlite_path: &str, first_run: DBFirstRun) -> Result { if let Ok(conn) = Connection::open(sqlite_path) { conn.execute("PRAGMA foreign_keys = ON;", []) - .expect("Should be able to enable \"foreign_keys\""); + .map_err(|e| format!("Should be able to handle \"foreign_keys\": {:?}", e))?; let result = conn.execute( " CREATE TABLE players (id INTEGER PRIMARY KEY NOT NULL, @@ -53,53 +53,74 @@ fn init_conn(sqlite_path: &str, first_run: DBFirstRun) -> Result>, sqlite_path: String) { +pub fn start_db_handler_thread( + rx: Receiver>, + sqlite_path: String, + shutdown_tx: SyncSender<()>, +) { thread::spawn(move || { // temporarily get conn which should initialize on first setup of db if let Ok(_conn) = init_conn(&sqlite_path, DBFirstRun::FirstRun) { } else { println!("ERROR: Failed init sqlite db connection"); + shutdown_tx.send(()).ok(); return; } - loop { - let result = rx.recv(); - //println!("db_handler: Got result from rx"); - - if let Ok(player_tx) = result { - //println!("db_handler: Got player_tx from rx"); - // got request to create new player, create new player - let mut player_id: u32 = thread_rng().gen(); - let conn = init_conn(&sqlite_path, DBFirstRun::NotFirstRun) - .expect("DB connection should be available"); - loop { - let mut stmt = conn - .prepare("SELECT id FROM players WHERE id = ?;") - .expect("Should be able to prepare DB statement"); - match stmt.query_row([player_id], |_row| Ok(())) { - Ok(_) => { - player_id = thread_rng().gen(); - } - Err(_) => break, - } - } - conn.execute( - "INSERT INTO players (id, date_added) VALUES (?, datetime());", - [player_id], - ) - .unwrap_or_else(|_| { - panic!("Should be able to insert new player with id {}", player_id) - }); - player_tx - .send(player_id) - .expect("Should be able to send back valid player id"); - } else { - println!("db_handler: Failed to get player_tx"); + 'outer: loop { + let rx_recv_result = rx.recv(); + if let Err(e) = rx_recv_result { + println!("Failed to get player_tx: {:?}", e); + shutdown_tx.send(()).ok(); + break; } + let player_tx = rx_recv_result.unwrap(); + + // got request to create new player, create new player + let mut player_id: u32 = thread_rng().gen(); + let conn_result = init_conn(&sqlite_path, DBFirstRun::NotFirstRun); + if let Err(e) = conn_result { + println!("Failed to get sqlite db connection: {:?}", e); + shutdown_tx.send(()).ok(); + break; + } + let conn = conn_result.unwrap(); + loop { + let stmt_result = conn.prepare("SELECT id FROM players WHERE id = ?;"); + if let Err(e) = stmt_result { + println!("Failed to create sqlite statement: {:?}", e); + shutdown_tx.send(()).ok(); + break 'outer; + } + let mut stmt = stmt_result.unwrap(); + match stmt.query_row([player_id], |_row| Ok(())) { + Ok(_) => { + player_id = thread_rng().gen(); + } + Err(_) => break, + } + } + let insert_result = conn.execute( + "INSERT INTO players (id, date_added) VALUES (?, datetime());", + [player_id], + ); + if let Err(e) = insert_result { + println!("Failed to insert into sqlite db: {:?}", e); + shutdown_tx.send(()).ok(); + break 'outer; + } + let send_result = player_tx.send(player_id); + if let Err(e) = send_result { + println!("Failed to send back player id: {:?}", e); + shutdown_tx.send(()).ok(); + break 'outer; + } + send_result.unwrap(); + // Pair up players // TODO } // loop end diff --git a/back_end/src/json_handlers.rs b/back_end/src/json_handlers.rs index 3d9b6d9..dfe2c77 100644 --- a/back_end/src/json_handlers.rs +++ b/back_end/src/json_handlers.rs @@ -1,15 +1,18 @@ use std::{ - sync::mpsc::{sync_channel, Receiver, SyncSender}, + sync::mpsc::{sync_channel, SyncSender}, time::Duration, }; use serde_json::Value; -pub fn handle_json(root: Value, tx: SyncSender>) -> Result { +pub fn handle_json( + root: Value, + tx: SyncSender>, + _shutdown_tx: SyncSender<()>, // maybe used here, not sure if it will be +) -> Result { if let Some(Value::String(type_str)) = root.get("type") { - let (player_tx, player_rx) = sync_channel::(8); match type_str.as_str() { - "pairing_request" => handle_pairing_request(tx, player_tx, player_rx), + "pairing_request" => handle_pairing_request(tx), "check_pairing" => handle_check_pairing(root), "place_token" => handle_place_token(root), "disconnect" => handle_disconnect(root), @@ -21,11 +24,8 @@ pub fn handle_json(root: Value, tx: SyncSender>) -> Result>, - player_tx: SyncSender, - player_rx: Receiver, -) -> Result { +fn handle_pairing_request(tx: SyncSender>) -> Result { + let (player_tx, player_rx) = sync_channel::(1); if tx.send(player_tx).is_err() { return Err("{\"type\":\"pairing_response\", \"status\":\"internal_error\"}".into()); } diff --git a/back_end/src/main.rs b/back_end/src/main.rs index 8b82821..bdef11e 100644 --- a/back_end/src/main.rs +++ b/back_end/src/main.rs @@ -6,6 +6,7 @@ const SQLITE_DB_PATH: &str = "./fourLineDropper.db"; use std::sync::mpsc::{sync_channel, SyncSender}; use db_handler::start_db_handler_thread; +use tokio::sync::oneshot; use warp::{Filter, Rejection}; #[tokio::main] @@ -13,19 +14,36 @@ async fn main() { let (db_tx, db_rx) = sync_channel::>(32); let db_tx_clone = db_tx.clone(); - start_db_handler_thread(db_rx, SQLITE_DB_PATH.into()); + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + + // Required because shutdown_tx is not cloneable, and its "send" consumes + // itself. + let (s_helper_tx, s_helper_rx) = sync_channel::<()>(1); + + std::thread::spawn(move || { + if let Ok(_unused_value) = s_helper_rx.recv() { + shutdown_tx + .send(()) + .expect("Should be able to send shutdown signal"); + } + }); + + start_db_handler_thread(db_rx, SQLITE_DB_PATH.into(), s_helper_tx.clone()); let route = warp::body::content_length_limit(1024 * 32) .and(warp::body::bytes()) .and_then(move |bytes: bytes::Bytes| { let db_tx_clone = db_tx_clone.clone(); + let s_helper_tx_clone = s_helper_tx.clone(); async move { let body_str_result = std::str::from_utf8(bytes.as_ref()); if let Ok(body_str) = body_str_result { let json_result = serde_json::from_str(body_str); if let Ok(json_value) = json_result { - Ok(json_handlers::handle_json(json_value, db_tx_clone) - .unwrap_or_else(|e| e)) + Ok( + json_handlers::handle_json(json_value, db_tx_clone, s_helper_tx_clone) + .unwrap_or_else(|e| e), + ) } else { Ok(String::from("{\"type\": \"invalid_syntax\"}")) } @@ -35,5 +53,10 @@ async fn main() { } }); - warp::serve(route).run(([0, 0, 0, 0], 1237)).await; + let (_addr, server) = + warp::serve(route).bind_with_graceful_shutdown(([0, 0, 0, 0], 1237), async move { + shutdown_rx.await.ok(); + }); + + tokio::task::spawn(server).await.unwrap(); }