]> git.seodisparate.com - mpd_info_screen/commitdiff
Refactor reconnect lost TCP connection
authorStephen Seo <seo.disparate@gmail.com>
Tue, 18 Mar 2025 02:12:49 +0000 (11:12 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Tue, 18 Mar 2025 02:18:29 +0000 (11:18 +0900)
Remove setting timeouts on TCP connection since non-blocking IO makes it
unnecessary.

More robust error handling when resetting TCP connection.

src/mpd_handler.rs

index e9de41c1fef2062e37389d3eb437d898696e41ed..0704bcfce6f68c1cb7b26ad38f4642c8409571a4 100644 (file)
@@ -12,8 +12,6 @@ const SLEEP_DURATION: Duration = Duration::from_millis(100);
 const POLL_DURATION: Duration = Duration::from_secs(5);
 const BUF_SIZE: usize = 1024 * 4;
 const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
-const STREAM_TIMEOUT: Duration = Duration::from_secs(5);
-const STREAM_TIMEOUT_CHECK: Duration = Duration::from_millis(4500);
 const RESTART_ZERO_BYTES_COUNT: u32 = 30;
 const PRE_RESTART_WAIT: Duration = Duration::from_secs(2);
 
@@ -243,8 +241,9 @@ fn read_line(
     Err((String::from("Newline not reached"), result))
 }
 
-fn restart_stream(state_handle: &mut RwLockWriteGuard<'_, MPDHandlerState>) -> Result<(), String> {
-    thread::sleep(PRE_RESTART_WAIT);
+fn restart_stream_impl(
+    state_handle: &mut RwLockWriteGuard<'_, MPDHandlerState>,
+) -> Result<(), String> {
     let peer = state_handle
         .stream
         .peer_addr()
@@ -253,49 +252,30 @@ fn restart_stream(state_handle: &mut RwLockWriteGuard<'_, MPDHandlerState>) -> R
         .stream
         .shutdown(std::net::Shutdown::Both)
         .map_err(|_| String::from("Failed to cleanup TCP stream"))?;
-    let reconnect_result = TcpStream::connect_timeout(&peer, CONNECT_TIMEOUT);
-    if let Ok(stream) = reconnect_result {
-        state_handle.stream = stream;
-    } else {
-        log("Failed to reconnect.", LogState::Error, LogLevel::Error);
-        state_handle.stop_flag.store(true, Ordering::Release);
-        return Err(String::from("Could not reconnect to MPD"));
-    }
+    state_handle.stream = TcpStream::connect_timeout(&peer, CONNECT_TIMEOUT)
+        .map_err(|_| String::from("Failed to reconnect"))?;
     state_handle
         .stream
         .set_nonblocking(true)
         .map_err(|_| String::from("Failed to set non-blocking on restarted TCP stream"))?;
-    state_handle
-        .stream
-        .set_read_timeout(Some(STREAM_TIMEOUT))
-        .map_err(|_| String::from("Failed to set read timeout on restarted TCP stream"))?;
-    state_handle
-        .stream
-        .set_write_timeout(Some(STREAM_TIMEOUT))
-        .map_err(|_| String::from("Failed to set write timeout on restarted TCP stream"))?;
-    state_handle.is_authenticated = false;
-    state_handle.can_authenticate = true;
-    state_handle.recv_zero_bytes_count = 0;
-    log(
-        "Connection restarted.",
-        LogState::Warning,
-        LogLevel::Warning,
-    );
     Ok(())
 }
 
-fn check_read_write_timeout(
-    pre_inst: Instant,
+fn restart_stream(
     state_handle: &mut RwLockWriteGuard<'_, MPDHandlerState>,
+    log_level: LogLevel,
 ) -> Result<(), String> {
-    if pre_inst.elapsed() > STREAM_TIMEOUT_CHECK {
-        log(
-            "Connection timed-out, restarting...",
-            LogState::Warning,
-            LogLevel::Warning,
-        );
-        restart_stream(state_handle)
+    thread::sleep(PRE_RESTART_WAIT);
+    let result = restart_stream_impl(state_handle);
+    if let Err(e) = result {
+        log("Failed to reconnect.", LogState::Error, LogLevel::Error);
+        state_handle.stop_flag.store(true, Ordering::Release);
+        Err(e)
     } else {
+        log("Connection restarted.", LogState::Warning, log_level);
+        state_handle.is_authenticated = false;
+        state_handle.can_authenticate = true;
+        state_handle.recv_zero_bytes_count = 0;
         Ok(())
     }
 }
@@ -463,14 +443,6 @@ impl MPDHandler {
                     .stream
                     .set_nonblocking(true)
                     .map_err(|_| String::from("Failed to set non-blocking on TCP stream"))?;
-                write_handle
-                    .stream
-                    .set_read_timeout(Some(STREAM_TIMEOUT))
-                    .map_err(|_| String::from("Failed to set read timeout on TCP stream"))?;
-                write_handle
-                    .stream
-                    .set_write_timeout(Some(STREAM_TIMEOUT))
-                    .map_err(|_| String::from("Failed to set write timeout on TCP stream"))?;
                 break;
             } else {
                 thread::sleep(POLL_DURATION);
@@ -546,10 +518,8 @@ impl MPDHandler {
             .try_write()
             .map_err(|_| String::from("Failed to get MPDHandler write lock (read_block)"))?;
         let mut read_amount: usize = 0;
-        let pre_read_instant = Instant::now();
         let read_result = write_handle.stream.read(buf);
         if let Err(io_err) = read_result {
-            check_read_write_timeout(pre_read_instant, &mut write_handle)?;
             if io_err.kind() != io::ErrorKind::WouldBlock {
                 return Err(format!("TCP stream error: {io_err}"));
             } else {
@@ -560,12 +530,13 @@ impl MPDHandler {
                 write_handle.recv_zero_bytes_count += 1;
                 if write_handle.recv_zero_bytes_count > RESTART_ZERO_BYTES_COUNT {
                     write_handle.recv_zero_bytes_count = 0;
+                    let log_level = write_handle.log_level;
                     log(
                         "Too many recv-zero-bytes, restarting connection...",
                         LogState::Warning,
-                        LogLevel::Warning,
+                        log_level,
                     );
-                    return restart_stream(&mut write_handle);
+                    return restart_stream(&mut write_handle, log_level);
                 }
                 return Err(String::from("Got zero bytes from TCP stream"));
             } else {
@@ -883,14 +854,12 @@ impl MPDHandler {
                 && write_handle.can_authenticate
             {
                 let p = write_handle.password.clone();
-                let pre_write_inst = Instant::now();
                 let write_result = write_handle
                     .stream
                     .write(format!("password {p}\n").as_bytes());
                 if write_result.is_ok() {
                     write_handle.poll_state = PollState::Password;
                 } else if let Err(e) = write_result {
-                    check_read_write_timeout(pre_write_inst, &mut write_handle)?;
                     log(
                         format!("Failed to send password for authentication: {e}"),
                         LogState::Error,
@@ -903,12 +872,10 @@ impl MPDHandler {
                 && write_handle.mpd_play_state == MPDPlayState::Playing
             {
                 write_handle.force_get_current_song = false;
-                let pre_write_inst = Instant::now();
                 let write_result = write_handle.stream.write(b"currentsong\n");
                 if write_result.is_ok() {
                     write_handle.poll_state = PollState::CurrentSong;
                 } else if let Err(e) = write_result {
-                    check_read_write_timeout(pre_write_inst, &mut write_handle)?;
                     log(
                         format!("Failed to request song info over stream: {e}"),
                         LogState::Error,
@@ -921,12 +888,10 @@ impl MPDHandler {
                     || write_handle.force_get_status)
             {
                 write_handle.force_get_status = false;
-                let pre_write_inst = Instant::now();
                 let write_result = write_handle.stream.write(b"status\n");
                 if write_result.is_ok() {
                     write_handle.poll_state = PollState::Status;
                 } else if let Err(e) = write_result {
-                    check_read_write_timeout(pre_write_inst, &mut write_handle)?;
                     log(
                         format!("Failed to request status over stream: {e}"),
                         LogState::Error,
@@ -940,14 +905,12 @@ impl MPDHandler {
                 let title = write_handle.current_song_filename.clone();
                 let art_data_length = write_handle.art_data.len();
                 if write_handle.can_get_album_art {
-                    let pre_write_inst = Instant::now();
                     let write_result = write_handle
                         .stream
                         .write(format!("readpicture \"{title}\" {art_data_length}\n").as_bytes());
                     if write_result.is_ok() {
                         write_handle.poll_state = PollState::ReadPicture;
                     } else if let Err(e) = write_result {
-                        check_read_write_timeout(pre_write_inst, &mut write_handle)?;
                         log(
                             format!("Failed to request album art: {e}"),
                             LogState::Error,
@@ -955,14 +918,12 @@ impl MPDHandler {
                         );
                     }
                 } else if write_handle.can_get_album_art_in_dir {
-                    let pre_write_inst = Instant::now();
                     let write_result = write_handle
                         .stream
                         .write(format!("albumart \"{title}\" {art_data_length}\n").as_bytes());
                     if write_result.is_ok() {
                         write_handle.poll_state = PollState::ReadPictureInDir;
                     } else if let Err(e) = write_result {
-                        check_read_write_timeout(pre_write_inst, &mut write_handle)?;
                         log(
                             format!("Failed to request album art in dir: {e}"),
                             LogState::Error,