From: Stephen Seo Date: Tue, 18 Mar 2025 02:12:49 +0000 (+0900) Subject: Refactor reconnect lost TCP connection X-Git-Tag: 0.5.0^2~3 X-Git-Url: https://git.seodisparate.com/stephenseo/static/git-logo.png?a=commitdiff_plain;h=a5f41bf1dbb1f319904b03aeb87bbcfd9f755f15;p=mpd_info_screen Refactor reconnect lost TCP connection Remove setting timeouts on TCP connection since non-blocking IO makes it unnecessary. More robust error handling when resetting TCP connection. --- diff --git a/src/mpd_handler.rs b/src/mpd_handler.rs index e9de41c..0704bcf 100644 --- a/src/mpd_handler.rs +++ b/src/mpd_handler.rs @@ -12,8 +12,6 @@ const SLEEP_DURATION: Duration = Duration::from_millis(100); const POLL_DURATION: Duration = Duration::from_secs(5); const BUF_SIZE: usize = 1024 * 4; const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); -const STREAM_TIMEOUT: Duration = Duration::from_secs(5); -const STREAM_TIMEOUT_CHECK: Duration = Duration::from_millis(4500); const RESTART_ZERO_BYTES_COUNT: u32 = 30; const PRE_RESTART_WAIT: Duration = Duration::from_secs(2); @@ -243,8 +241,9 @@ fn read_line( Err((String::from("Newline not reached"), result)) } -fn restart_stream(state_handle: &mut RwLockWriteGuard<'_, MPDHandlerState>) -> Result<(), String> { - thread::sleep(PRE_RESTART_WAIT); +fn restart_stream_impl( + state_handle: &mut RwLockWriteGuard<'_, MPDHandlerState>, +) -> Result<(), String> { let peer = state_handle .stream .peer_addr() @@ -253,49 +252,30 @@ fn restart_stream(state_handle: &mut RwLockWriteGuard<'_, MPDHandlerState>) -> R .stream .shutdown(std::net::Shutdown::Both) .map_err(|_| String::from("Failed to cleanup TCP stream"))?; - let reconnect_result = TcpStream::connect_timeout(&peer, CONNECT_TIMEOUT); - if let Ok(stream) = reconnect_result { - state_handle.stream = stream; - } else { - log("Failed to reconnect.", LogState::Error, LogLevel::Error); - state_handle.stop_flag.store(true, Ordering::Release); - return Err(String::from("Could not reconnect to MPD")); - } + state_handle.stream = TcpStream::connect_timeout(&peer, CONNECT_TIMEOUT) + .map_err(|_| String::from("Failed to reconnect"))?; state_handle .stream .set_nonblocking(true) .map_err(|_| String::from("Failed to set non-blocking on restarted TCP stream"))?; - state_handle - .stream - .set_read_timeout(Some(STREAM_TIMEOUT)) - .map_err(|_| String::from("Failed to set read timeout on restarted TCP stream"))?; - state_handle - .stream - .set_write_timeout(Some(STREAM_TIMEOUT)) - .map_err(|_| String::from("Failed to set write timeout on restarted TCP stream"))?; - state_handle.is_authenticated = false; - state_handle.can_authenticate = true; - state_handle.recv_zero_bytes_count = 0; - log( - "Connection restarted.", - LogState::Warning, - LogLevel::Warning, - ); Ok(()) } -fn check_read_write_timeout( - pre_inst: Instant, +fn restart_stream( state_handle: &mut RwLockWriteGuard<'_, MPDHandlerState>, + log_level: LogLevel, ) -> Result<(), String> { - if pre_inst.elapsed() > STREAM_TIMEOUT_CHECK { - log( - "Connection timed-out, restarting...", - LogState::Warning, - LogLevel::Warning, - ); - restart_stream(state_handle) + thread::sleep(PRE_RESTART_WAIT); + let result = restart_stream_impl(state_handle); + if let Err(e) = result { + log("Failed to reconnect.", LogState::Error, LogLevel::Error); + state_handle.stop_flag.store(true, Ordering::Release); + Err(e) } else { + log("Connection restarted.", LogState::Warning, log_level); + state_handle.is_authenticated = false; + state_handle.can_authenticate = true; + state_handle.recv_zero_bytes_count = 0; Ok(()) } } @@ -463,14 +443,6 @@ impl MPDHandler { .stream .set_nonblocking(true) .map_err(|_| String::from("Failed to set non-blocking on TCP stream"))?; - write_handle - .stream - .set_read_timeout(Some(STREAM_TIMEOUT)) - .map_err(|_| String::from("Failed to set read timeout on TCP stream"))?; - write_handle - .stream - .set_write_timeout(Some(STREAM_TIMEOUT)) - .map_err(|_| String::from("Failed to set write timeout on TCP stream"))?; break; } else { thread::sleep(POLL_DURATION); @@ -546,10 +518,8 @@ impl MPDHandler { .try_write() .map_err(|_| String::from("Failed to get MPDHandler write lock (read_block)"))?; let mut read_amount: usize = 0; - let pre_read_instant = Instant::now(); let read_result = write_handle.stream.read(buf); if let Err(io_err) = read_result { - check_read_write_timeout(pre_read_instant, &mut write_handle)?; if io_err.kind() != io::ErrorKind::WouldBlock { return Err(format!("TCP stream error: {io_err}")); } else { @@ -560,12 +530,13 @@ impl MPDHandler { write_handle.recv_zero_bytes_count += 1; if write_handle.recv_zero_bytes_count > RESTART_ZERO_BYTES_COUNT { write_handle.recv_zero_bytes_count = 0; + let log_level = write_handle.log_level; log( "Too many recv-zero-bytes, restarting connection...", LogState::Warning, - LogLevel::Warning, + log_level, ); - return restart_stream(&mut write_handle); + return restart_stream(&mut write_handle, log_level); } return Err(String::from("Got zero bytes from TCP stream")); } else { @@ -883,14 +854,12 @@ impl MPDHandler { && write_handle.can_authenticate { let p = write_handle.password.clone(); - let pre_write_inst = Instant::now(); let write_result = write_handle .stream .write(format!("password {p}\n").as_bytes()); if write_result.is_ok() { write_handle.poll_state = PollState::Password; } else if let Err(e) = write_result { - check_read_write_timeout(pre_write_inst, &mut write_handle)?; log( format!("Failed to send password for authentication: {e}"), LogState::Error, @@ -903,12 +872,10 @@ impl MPDHandler { && write_handle.mpd_play_state == MPDPlayState::Playing { write_handle.force_get_current_song = false; - let pre_write_inst = Instant::now(); let write_result = write_handle.stream.write(b"currentsong\n"); if write_result.is_ok() { write_handle.poll_state = PollState::CurrentSong; } else if let Err(e) = write_result { - check_read_write_timeout(pre_write_inst, &mut write_handle)?; log( format!("Failed to request song info over stream: {e}"), LogState::Error, @@ -921,12 +888,10 @@ impl MPDHandler { || write_handle.force_get_status) { write_handle.force_get_status = false; - let pre_write_inst = Instant::now(); let write_result = write_handle.stream.write(b"status\n"); if write_result.is_ok() { write_handle.poll_state = PollState::Status; } else if let Err(e) = write_result { - check_read_write_timeout(pre_write_inst, &mut write_handle)?; log( format!("Failed to request status over stream: {e}"), LogState::Error, @@ -940,14 +905,12 @@ impl MPDHandler { let title = write_handle.current_song_filename.clone(); let art_data_length = write_handle.art_data.len(); if write_handle.can_get_album_art { - let pre_write_inst = Instant::now(); let write_result = write_handle .stream .write(format!("readpicture \"{title}\" {art_data_length}\n").as_bytes()); if write_result.is_ok() { write_handle.poll_state = PollState::ReadPicture; } else if let Err(e) = write_result { - check_read_write_timeout(pre_write_inst, &mut write_handle)?; log( format!("Failed to request album art: {e}"), LogState::Error, @@ -955,14 +918,12 @@ impl MPDHandler { ); } } else if write_handle.can_get_album_art_in_dir { - let pre_write_inst = Instant::now(); let write_result = write_handle .stream .write(format!("albumart \"{title}\" {art_data_length}\n").as_bytes()); if write_result.is_ok() { write_handle.poll_state = PollState::ReadPictureInDir; } else if let Err(e) = write_result { - check_read_write_timeout(pre_write_inst, &mut write_handle)?; log( format!("Failed to request album art in dir: {e}"), LogState::Error,