]> git.seodisparate.com - EN605.607.81.SP22_ASDM_Project/commitdiff
Impl graceful shutdown of backend on failure sprint_03_day_5
authorStephen Seo <seo.disparate@gmail.com>
Fri, 18 Mar 2022 14:43:15 +0000 (23:43 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Fri, 18 Mar 2022 14:43:15 +0000 (23:43 +0900)
back_end/Cargo.lock
back_end/Cargo.toml
back_end/src/db_handler.rs
back_end/src/json_handlers.rs
back_end/src/main.rs

index 200c91fd3391ec873a73b244a272eab8c7639fe1..debb9b5da2d8024c53488f94c542f0d89c13b6d8 100644 (file)
@@ -157,6 +157,7 @@ name = "four_line_dropper_backend"
 version = "0.1.0"
 dependencies = [
  "bytes",
+ "futures",
  "rand",
  "rusqlite",
  "serde_json",
@@ -164,6 +165,21 @@ dependencies = [
  "warp",
 ]
 
+[[package]]
+name = "futures"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
 [[package]]
 name = "futures-channel"
 version = "0.3.21"
@@ -180,6 +196,34 @@ version = "0.3.21"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
 
+[[package]]
+name = "futures-executor"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "futures-sink"
 version = "0.3.21"
@@ -198,9 +242,13 @@ version = "0.3.21"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
 dependencies = [
+ "futures-channel",
  "futures-core",
+ "futures-io",
+ "futures-macro",
  "futures-sink",
  "futures-task",
+ "memchr",
  "pin-project-lite",
  "pin-utils",
  "slab",
index 2149c40b735f03bbb23e8f1d332a607b92030d01..c2804db2472162804a7fad6f324288ddcd947767 100644 (file)
@@ -12,3 +12,4 @@ serde_json = "1.0"
 bytes = "1.1"
 rusqlite = "0.27.0"
 rand = "0.8.4"
+futures = "0.3"
index de42c2b9c4744528ede2ef2d76f584a286fe5cb0..b432653874ea67dc92a757f10076280f065aae0d 100644 (file)
@@ -13,7 +13,7 @@ enum DBFirstRun {
 fn init_conn(sqlite_path: &str, first_run: DBFirstRun) -> Result<Connection, String> {
     if let Ok(conn) = Connection::open(sqlite_path) {
         conn.execute("PRAGMA foreign_keys = ON;", [])
-            .expect("Should be able to enable \"foreign_keys\"");
+            .map_err(|e| format!("Should be able to handle \"foreign_keys\": {:?}", e))?;
         let result = conn.execute(
             "
             CREATE TABLE players (id INTEGER PRIMARY KEY NOT NULL,
@@ -53,53 +53,74 @@ fn init_conn(sqlite_path: &str, first_run: DBFirstRun) -> Result<Connection, Str
         }
         Ok(conn)
     } else {
-        Err(String::from("Failed to get connection"))
+        Err(String::from("Failed to open connection"))
     }
 }
 
-pub fn start_db_handler_thread(rx: Receiver<SyncSender<u32>>, sqlite_path: String) {
+pub fn start_db_handler_thread(
+    rx: Receiver<SyncSender<u32>>,
+    sqlite_path: String,
+    shutdown_tx: SyncSender<()>,
+) {
     thread::spawn(move || {
         // temporarily get conn which should initialize on first setup of db
         if let Ok(_conn) = init_conn(&sqlite_path, DBFirstRun::FirstRun) {
         } else {
             println!("ERROR: Failed init sqlite db connection");
+            shutdown_tx.send(()).ok();
             return;
         }
 
-        loop {
-            let result = rx.recv();
-            //println!("db_handler: Got result from rx");
+        'outer: loop {
+            let rx_recv_result = rx.recv();
+            if let Err(e) = rx_recv_result {
+                println!("Failed to get player_tx: {:?}", e);
+                shutdown_tx.send(()).ok();
+                break;
+            }
+            let player_tx = rx_recv_result.unwrap();
 
-            if let Ok(player_tx) = result {
-                //println!("db_handler: Got player_tx from rx");
-                // got request to create new player, create new player
-                let mut player_id: u32 = thread_rng().gen();
-                let conn = init_conn(&sqlite_path, DBFirstRun::NotFirstRun)
-                    .expect("DB connection should be available");
-                loop {
-                    let mut stmt = conn
-                        .prepare("SELECT id FROM players WHERE id = ?;")
-                        .expect("Should be able to prepare DB statement");
-                    match stmt.query_row([player_id], |_row| Ok(())) {
-                        Ok(_) => {
-                            player_id = thread_rng().gen();
-                        }
-                        Err(_) => break,
+            // got request to create new player, create new player
+            let mut player_id: u32 = thread_rng().gen();
+            let conn_result = init_conn(&sqlite_path, DBFirstRun::NotFirstRun);
+            if let Err(e) = conn_result {
+                println!("Failed to get sqlite db connection: {:?}", e);
+                shutdown_tx.send(()).ok();
+                break;
+            }
+            let conn = conn_result.unwrap();
+            loop {
+                let stmt_result = conn.prepare("SELECT id FROM players WHERE id = ?;");
+                if let Err(e) = stmt_result {
+                    println!("Failed to create sqlite statement: {:?}", e);
+                    shutdown_tx.send(()).ok();
+                    break 'outer;
+                }
+                let mut stmt = stmt_result.unwrap();
+                match stmt.query_row([player_id], |_row| Ok(())) {
+                    Ok(_) => {
+                        player_id = thread_rng().gen();
                     }
+                    Err(_) => break,
                 }
-                conn.execute(
-                    "INSERT INTO players (id, date_added) VALUES (?, datetime());",
-                    [player_id],
-                )
-                .unwrap_or_else(|_| {
-                    panic!("Should be able to insert new player with id {}", player_id)
-                });
-                player_tx
-                    .send(player_id)
-                    .expect("Should be able to send back valid player id");
-            } else {
-                println!("db_handler: Failed to get player_tx");
             }
+            let insert_result = conn.execute(
+                "INSERT INTO players (id, date_added) VALUES (?, datetime());",
+                [player_id],
+            );
+            if let Err(e) = insert_result {
+                println!("Failed to insert into sqlite db: {:?}", e);
+                shutdown_tx.send(()).ok();
+                break 'outer;
+            }
+            let send_result = player_tx.send(player_id);
+            if let Err(e) = send_result {
+                println!("Failed to send back player id: {:?}", e);
+                shutdown_tx.send(()).ok();
+                break 'outer;
+            }
+            send_result.unwrap();
+
             // Pair up players
             // TODO
         } // loop end
index 3d9b6d973afc8749c8e58e7adc8b9fa8de6bc627..dfe2c778b731c5522b4000c57411aef970f6dc35 100644 (file)
@@ -1,15 +1,18 @@
 use std::{
-    sync::mpsc::{sync_channel, Receiver, SyncSender},
+    sync::mpsc::{sync_channel, SyncSender},
     time::Duration,
 };
 
 use serde_json::Value;
 
-pub fn handle_json(root: Value, tx: SyncSender<SyncSender<u32>>) -> Result<String, String> {
+pub fn handle_json(
+    root: Value,
+    tx: SyncSender<SyncSender<u32>>,
+    _shutdown_tx: SyncSender<()>, // maybe used here, not sure if it will be
+) -> Result<String, String> {
     if let Some(Value::String(type_str)) = root.get("type") {
-        let (player_tx, player_rx) = sync_channel::<u32>(8);
         match type_str.as_str() {
-            "pairing_request" => handle_pairing_request(tx, player_tx, player_rx),
+            "pairing_request" => handle_pairing_request(tx),
             "check_pairing" => handle_check_pairing(root),
             "place_token" => handle_place_token(root),
             "disconnect" => handle_disconnect(root),
@@ -21,11 +24,8 @@ pub fn handle_json(root: Value, tx: SyncSender<SyncSender<u32>>) -> Result<Strin
     }
 }
 
-fn handle_pairing_request(
-    tx: SyncSender<SyncSender<u32>>,
-    player_tx: SyncSender<u32>,
-    player_rx: Receiver<u32>,
-) -> Result<String, String> {
+fn handle_pairing_request(tx: SyncSender<SyncSender<u32>>) -> Result<String, String> {
+    let (player_tx, player_rx) = sync_channel::<u32>(1);
     if tx.send(player_tx).is_err() {
         return Err("{\"type\":\"pairing_response\", \"status\":\"internal_error\"}".into());
     }
index 8b82821cf47004fd20f7d177033c5f80f6beb1cf..bdef11e386b73f7ea96edc4d8fdab6f7159317b4 100644 (file)
@@ -6,6 +6,7 @@ const SQLITE_DB_PATH: &str = "./fourLineDropper.db";
 use std::sync::mpsc::{sync_channel, SyncSender};
 
 use db_handler::start_db_handler_thread;
+use tokio::sync::oneshot;
 use warp::{Filter, Rejection};
 
 #[tokio::main]
@@ -13,19 +14,36 @@ async fn main() {
     let (db_tx, db_rx) = sync_channel::<SyncSender<u32>>(32);
     let db_tx_clone = db_tx.clone();
 
-    start_db_handler_thread(db_rx, SQLITE_DB_PATH.into());
+    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
+
+    // Required because shutdown_tx is not cloneable, and its "send" consumes
+    // itself.
+    let (s_helper_tx, s_helper_rx) = sync_channel::<()>(1);
+
+    std::thread::spawn(move || {
+        if let Ok(_unused_value) = s_helper_rx.recv() {
+            shutdown_tx
+                .send(())
+                .expect("Should be able to send shutdown signal");
+        }
+    });
+
+    start_db_handler_thread(db_rx, SQLITE_DB_PATH.into(), s_helper_tx.clone());
 
     let route = warp::body::content_length_limit(1024 * 32)
         .and(warp::body::bytes())
         .and_then(move |bytes: bytes::Bytes| {
             let db_tx_clone = db_tx_clone.clone();
+            let s_helper_tx_clone = s_helper_tx.clone();
             async move {
                 let body_str_result = std::str::from_utf8(bytes.as_ref());
                 if let Ok(body_str) = body_str_result {
                     let json_result = serde_json::from_str(body_str);
                     if let Ok(json_value) = json_result {
-                        Ok(json_handlers::handle_json(json_value, db_tx_clone)
-                            .unwrap_or_else(|e| e))
+                        Ok(
+                            json_handlers::handle_json(json_value, db_tx_clone, s_helper_tx_clone)
+                                .unwrap_or_else(|e| e),
+                        )
                     } else {
                         Ok(String::from("{\"type\": \"invalid_syntax\"}"))
                     }
@@ -35,5 +53,10 @@ async fn main() {
             }
         });
 
-    warp::serve(route).run(([0, 0, 0, 0], 1237)).await;
+    let (_addr, server) =
+        warp::serve(route).bind_with_graceful_shutdown(([0, 0, 0, 0], 1237), async move {
+            shutdown_rx.await.ok();
+        });
+
+    tokio::task::spawn(server).await.unwrap();
 }