]> git.seodisparate.com - mpd_info_screen/commitdiff
Impl. reconnect to MPD if temporarily disconnected
authorStephen Seo <seo.disparate@gmail.com>
Mon, 17 Mar 2025 07:49:29 +0000 (16:49 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Mon, 17 Mar 2025 07:49:29 +0000 (16:49 +0900)
src/display.rs
src/main.rs
src/mpd_handler.rs

index a8360c0d2ab1924ec10eebaa1142cca4e19a950a..85a9e052712399b3de1a04734970735816bdb7f2 100644 (file)
@@ -757,6 +757,20 @@ impl MPDDisplay {
 
         Ok(())
     }
+
+    pub fn is_authenticated(&self) -> bool {
+        self.is_authenticated
+    }
+
+    pub fn get_is_mpd_handler_stopped(&self) -> Result<bool, String> {
+        Ok(self
+            .mpd_handler
+            .as_ref()?
+            .get_state_read_guard()
+            .map_err(|_| String::from("Failed to get inner MPDHandler state"))?
+            .stop_flag
+            .load(Ordering::Acquire))
+    }
 }
 
 impl EventHandler for MPDDisplay {
index aab0b7a2a31e46b58087ca1fd053ea7d4952c084..c94251256105660c55af41ed96bc4657e8d13a06 100644 (file)
@@ -113,7 +113,10 @@ fn main() -> Result<(), String> {
     let mut modifiers_state: ModifiersState = ModifiersState::default();
 
     event_loop.run(move |mut event, _window_target, control_flow| {
-        if !ctx.continuing || ctx.quit_requested {
+        if !ctx.continuing
+            || ctx.quit_requested
+            || (display.is_authenticated() && display.get_is_mpd_handler_stopped().unwrap_or(false))
+        {
             *control_flow = ControlFlow::Exit;
             return;
         }
index 344be93ddedfcf74023e766ac2bc1e65e79993cb..b86e1471be01cfd1c64e8cdbe433ff7b3872c8e1 100644 (file)
@@ -4,13 +4,17 @@ use std::io::{self, Read, Write as IOWrite};
 use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
 use std::str::FromStr;
 use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
+use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
 use std::thread;
 use std::time::{Duration, Instant};
 
 const SLEEP_DURATION: Duration = Duration::from_millis(100);
 const POLL_DURATION: Duration = Duration::from_secs(5);
 const BUF_SIZE: usize = 1024 * 4;
+const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
+const STREAM_TIMEOUT: Duration = Duration::from_secs(5);
+const STREAM_TIMEOUT_CHECK: Duration = Duration::from_millis(4500);
+const RESTART_ZERO_BYTES_COUNT: u32 = 30;
 
 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
 enum PollState {
@@ -80,6 +84,7 @@ pub struct MPDHandlerState {
     pub stop_flag: Arc<AtomicBool>,
     log_level: LogLevel,
     mpd_play_state: MPDPlayState,
+    recv_zero_bytes_count: u32,
 }
 
 fn check_next_chars(
@@ -237,6 +242,62 @@ fn read_line(
     Err((String::from("Newline not reached"), result))
 }
 
+fn restart_stream(state_handle: &mut RwLockWriteGuard<'_, MPDHandlerState>) -> Result<(), String> {
+    let peer = state_handle
+        .stream
+        .peer_addr()
+        .map_err(|_| String::from("Failed to get TCP stream peer addr/port"))?;
+    state_handle
+        .stream
+        .shutdown(std::net::Shutdown::Both)
+        .map_err(|_| String::from("Failed to cleanup TCP stream"))?;
+    let reconnect_result = TcpStream::connect_timeout(&peer, CONNECT_TIMEOUT);
+    if let Ok(stream) = reconnect_result {
+        state_handle.stream = stream;
+    } else {
+        log("Failed to reconnect.", LogState::Error, LogLevel::Error);
+        state_handle.stop_flag.store(true, Ordering::Release);
+        return Err(String::from("Could not reconnect to MPD"));
+    }
+    state_handle
+        .stream
+        .set_nonblocking(true)
+        .map_err(|_| String::from("Failed to set non-blocking on restarted TCP stream"))?;
+    state_handle
+        .stream
+        .set_read_timeout(Some(STREAM_TIMEOUT))
+        .map_err(|_| String::from("Failed to set read timeout on restarted TCP stream"))?;
+    state_handle
+        .stream
+        .set_write_timeout(Some(STREAM_TIMEOUT))
+        .map_err(|_| String::from("Failed to set write timeout on restarted TCP stream"))?;
+    state_handle.is_authenticated = false;
+    state_handle.can_authenticate = true;
+    state_handle.recv_zero_bytes_count = 0;
+    log(
+        "Connection restarted.",
+        LogState::Warning,
+        LogLevel::Warning,
+    );
+    Ok(())
+}
+
+fn check_read_write_timeout(
+    pre_inst: Instant,
+    state_handle: &mut RwLockWriteGuard<'_, MPDHandlerState>,
+) -> Result<(), String> {
+    if pre_inst.elapsed() > STREAM_TIMEOUT_CHECK {
+        log(
+            "Connection timed-out, restarting...",
+            LogState::Warning,
+            LogLevel::Warning,
+        );
+        restart_stream(state_handle)
+    } else {
+        Ok(())
+    }
+}
+
 impl MPDHandler {
     pub fn new(
         host: Ipv4Addr,
@@ -244,11 +305,9 @@ impl MPDHandler {
         password: String,
         log_level: LogLevel,
     ) -> Result<Self, String> {
-        let stream = TcpStream::connect_timeout(
-            &SocketAddr::new(IpAddr::V4(host), port),
-            Duration::from_secs(5),
-        )
-        .map_err(|_| String::from("Failed to get TCP connection (is MPD running?)"))?;
+        let stream =
+            TcpStream::connect_timeout(&SocketAddr::new(IpAddr::V4(host), port), CONNECT_TIMEOUT)
+                .map_err(|_| String::from("Failed to get TCP connection (is MPD running?)"))?;
 
         let password_is_empty = password.is_empty();
 
@@ -285,6 +344,7 @@ impl MPDHandler {
                 log_level,
                 mpd_play_state: MPDPlayState::Stopped,
                 current_song_album: String::new(),
+                recv_zero_bytes_count: 0,
             })),
         };
 
@@ -401,6 +461,14 @@ impl MPDHandler {
                     .stream
                     .set_nonblocking(true)
                     .map_err(|_| String::from("Failed to set non-blocking on TCP stream"))?;
+                write_handle
+                    .stream
+                    .set_read_timeout(Some(STREAM_TIMEOUT))
+                    .map_err(|_| String::from("Failed to set read timeout on TCP stream"))?;
+                write_handle
+                    .stream
+                    .set_write_timeout(Some(STREAM_TIMEOUT))
+                    .map_err(|_| String::from("Failed to set write timeout on TCP stream"))?;
                 break;
             } else {
                 thread::sleep(POLL_DURATION);
@@ -476,8 +544,10 @@ impl MPDHandler {
             .try_write()
             .map_err(|_| String::from("Failed to get MPDHandler write lock (read_block)"))?;
         let mut read_amount: usize = 0;
+        let pre_read_instant = Instant::now();
         let read_result = write_handle.stream.read(buf);
         if let Err(io_err) = read_result {
+            check_read_write_timeout(pre_read_instant, &mut write_handle)?;
             if io_err.kind() != io::ErrorKind::WouldBlock {
                 return Err(format!("TCP stream error: {io_err}"));
             } else {
@@ -485,9 +555,20 @@ impl MPDHandler {
             }
         } else if let Ok(read_amount_result) = read_result {
             if read_amount_result == 0 {
+                write_handle.recv_zero_bytes_count += 1;
+                if write_handle.recv_zero_bytes_count > RESTART_ZERO_BYTES_COUNT {
+                    log(
+                        "Too many recv-zero-bytes, restarting connection...",
+                        LogState::Warning,
+                        LogLevel::Warning,
+                    );
+                    return restart_stream(&mut write_handle);
+                }
                 return Err(String::from("Got zero bytes from TCP stream"));
+            } else {
+                write_handle.recv_zero_bytes_count = 0;
+                read_amount = read_amount_result;
             }
-            read_amount = read_amount_result;
         }
         let mut buf_vec: Vec<u8> = Vec::from(&buf[0..read_amount]);
 
@@ -799,12 +880,14 @@ impl MPDHandler {
                 && write_handle.can_authenticate
             {
                 let p = write_handle.password.clone();
+                let pre_write_inst = Instant::now();
                 let write_result = write_handle
                     .stream
                     .write(format!("password {p}\n").as_bytes());
                 if write_result.is_ok() {
                     write_handle.poll_state = PollState::Password;
                 } else if let Err(e) = write_result {
+                    check_read_write_timeout(pre_write_inst, &mut write_handle)?;
                     log(
                         format!("Failed to send password for authentication: {e}"),
                         LogState::Error,
@@ -817,10 +900,12 @@ impl MPDHandler {
                 && write_handle.mpd_play_state == MPDPlayState::Playing
             {
                 write_handle.force_get_current_song = false;
+                let pre_write_inst = Instant::now();
                 let write_result = write_handle.stream.write(b"currentsong\n");
                 if write_result.is_ok() {
                     write_handle.poll_state = PollState::CurrentSong;
                 } else if let Err(e) = write_result {
+                    check_read_write_timeout(pre_write_inst, &mut write_handle)?;
                     log(
                         format!("Failed to request song info over stream: {e}"),
                         LogState::Error,
@@ -833,10 +918,12 @@ impl MPDHandler {
                     || write_handle.force_get_status)
             {
                 write_handle.force_get_status = false;
+                let pre_write_inst = Instant::now();
                 let write_result = write_handle.stream.write(b"status\n");
                 if write_result.is_ok() {
                     write_handle.poll_state = PollState::Status;
                 } else if let Err(e) = write_result {
+                    check_read_write_timeout(pre_write_inst, &mut write_handle)?;
                     log(
                         format!("Failed to request status over stream: {e}"),
                         LogState::Error,
@@ -850,12 +937,14 @@ impl MPDHandler {
                 let title = write_handle.current_song_filename.clone();
                 let art_data_length = write_handle.art_data.len();
                 if write_handle.can_get_album_art {
+                    let pre_write_inst = Instant::now();
                     let write_result = write_handle
                         .stream
                         .write(format!("readpicture \"{title}\" {art_data_length}\n").as_bytes());
                     if write_result.is_ok() {
                         write_handle.poll_state = PollState::ReadPicture;
                     } else if let Err(e) = write_result {
+                        check_read_write_timeout(pre_write_inst, &mut write_handle)?;
                         log(
                             format!("Failed to request album art: {e}"),
                             LogState::Error,
@@ -863,12 +952,14 @@ impl MPDHandler {
                         );
                     }
                 } else if write_handle.can_get_album_art_in_dir {
+                    let pre_write_inst = Instant::now();
                     let write_result = write_handle
                         .stream
                         .write(format!("albumart \"{title}\" {art_data_length}\n").as_bytes());
                     if write_result.is_ok() {
                         write_handle.poll_state = PollState::ReadPictureInDir;
                     } else if let Err(e) = write_result {
+                        check_read_write_timeout(pre_write_inst, &mut write_handle)?;
                         log(
                             format!("Failed to request album art in dir: {e}"),
                             LogState::Error,