use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
+use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::thread;
use std::time::{Duration, Instant};
const SLEEP_DURATION: Duration = Duration::from_millis(100);
const POLL_DURATION: Duration = Duration::from_secs(5);
const BUF_SIZE: usize = 1024 * 4;
+const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
+const STREAM_TIMEOUT: Duration = Duration::from_secs(5);
+const STREAM_TIMEOUT_CHECK: Duration = Duration::from_millis(4500);
+const RESTART_ZERO_BYTES_COUNT: u32 = 30;
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
enum PollState {
pub stop_flag: Arc<AtomicBool>,
log_level: LogLevel,
mpd_play_state: MPDPlayState,
+ recv_zero_bytes_count: u32,
}
fn check_next_chars(
Err((String::from("Newline not reached"), result))
}
+fn restart_stream(state_handle: &mut RwLockWriteGuard<'_, MPDHandlerState>) -> Result<(), String> {
+ let peer = state_handle
+ .stream
+ .peer_addr()
+ .map_err(|_| String::from("Failed to get TCP stream peer addr/port"))?;
+ state_handle
+ .stream
+ .shutdown(std::net::Shutdown::Both)
+ .map_err(|_| String::from("Failed to cleanup TCP stream"))?;
+ let reconnect_result = TcpStream::connect_timeout(&peer, CONNECT_TIMEOUT);
+ if let Ok(stream) = reconnect_result {
+ state_handle.stream = stream;
+ } else {
+ log("Failed to reconnect.", LogState::Error, LogLevel::Error);
+ state_handle.stop_flag.store(true, Ordering::Release);
+ return Err(String::from("Could not reconnect to MPD"));
+ }
+ state_handle
+ .stream
+ .set_nonblocking(true)
+ .map_err(|_| String::from("Failed to set non-blocking on restarted TCP stream"))?;
+ state_handle
+ .stream
+ .set_read_timeout(Some(STREAM_TIMEOUT))
+ .map_err(|_| String::from("Failed to set read timeout on restarted TCP stream"))?;
+ state_handle
+ .stream
+ .set_write_timeout(Some(STREAM_TIMEOUT))
+ .map_err(|_| String::from("Failed to set write timeout on restarted TCP stream"))?;
+ state_handle.is_authenticated = false;
+ state_handle.can_authenticate = true;
+ state_handle.recv_zero_bytes_count = 0;
+ log(
+ "Connection restarted.",
+ LogState::Warning,
+ LogLevel::Warning,
+ );
+ Ok(())
+}
+
+fn check_read_write_timeout(
+ pre_inst: Instant,
+ state_handle: &mut RwLockWriteGuard<'_, MPDHandlerState>,
+) -> Result<(), String> {
+ if pre_inst.elapsed() > STREAM_TIMEOUT_CHECK {
+ log(
+ "Connection timed-out, restarting...",
+ LogState::Warning,
+ LogLevel::Warning,
+ );
+ restart_stream(state_handle)
+ } else {
+ Ok(())
+ }
+}
+
impl MPDHandler {
pub fn new(
host: Ipv4Addr,
password: String,
log_level: LogLevel,
) -> Result<Self, String> {
- let stream = TcpStream::connect_timeout(
- &SocketAddr::new(IpAddr::V4(host), port),
- Duration::from_secs(5),
- )
- .map_err(|_| String::from("Failed to get TCP connection (is MPD running?)"))?;
+ let stream =
+ TcpStream::connect_timeout(&SocketAddr::new(IpAddr::V4(host), port), CONNECT_TIMEOUT)
+ .map_err(|_| String::from("Failed to get TCP connection (is MPD running?)"))?;
let password_is_empty = password.is_empty();
log_level,
mpd_play_state: MPDPlayState::Stopped,
current_song_album: String::new(),
+ recv_zero_bytes_count: 0,
})),
};
.stream
.set_nonblocking(true)
.map_err(|_| String::from("Failed to set non-blocking on TCP stream"))?;
+ write_handle
+ .stream
+ .set_read_timeout(Some(STREAM_TIMEOUT))
+ .map_err(|_| String::from("Failed to set read timeout on TCP stream"))?;
+ write_handle
+ .stream
+ .set_write_timeout(Some(STREAM_TIMEOUT))
+ .map_err(|_| String::from("Failed to set write timeout on TCP stream"))?;
break;
} else {
thread::sleep(POLL_DURATION);
.try_write()
.map_err(|_| String::from("Failed to get MPDHandler write lock (read_block)"))?;
let mut read_amount: usize = 0;
+ let pre_read_instant = Instant::now();
let read_result = write_handle.stream.read(buf);
if let Err(io_err) = read_result {
+ check_read_write_timeout(pre_read_instant, &mut write_handle)?;
if io_err.kind() != io::ErrorKind::WouldBlock {
return Err(format!("TCP stream error: {io_err}"));
} else {
}
} else if let Ok(read_amount_result) = read_result {
if read_amount_result == 0 {
+ write_handle.recv_zero_bytes_count += 1;
+ if write_handle.recv_zero_bytes_count > RESTART_ZERO_BYTES_COUNT {
+ log(
+ "Too many recv-zero-bytes, restarting connection...",
+ LogState::Warning,
+ LogLevel::Warning,
+ );
+ return restart_stream(&mut write_handle);
+ }
return Err(String::from("Got zero bytes from TCP stream"));
+ } else {
+ write_handle.recv_zero_bytes_count = 0;
+ read_amount = read_amount_result;
}
- read_amount = read_amount_result;
}
let mut buf_vec: Vec<u8> = Vec::from(&buf[0..read_amount]);
&& write_handle.can_authenticate
{
let p = write_handle.password.clone();
+ let pre_write_inst = Instant::now();
let write_result = write_handle
.stream
.write(format!("password {p}\n").as_bytes());
if write_result.is_ok() {
write_handle.poll_state = PollState::Password;
} else if let Err(e) = write_result {
+ check_read_write_timeout(pre_write_inst, &mut write_handle)?;
log(
format!("Failed to send password for authentication: {e}"),
LogState::Error,
&& write_handle.mpd_play_state == MPDPlayState::Playing
{
write_handle.force_get_current_song = false;
+ let pre_write_inst = Instant::now();
let write_result = write_handle.stream.write(b"currentsong\n");
if write_result.is_ok() {
write_handle.poll_state = PollState::CurrentSong;
} else if let Err(e) = write_result {
+ check_read_write_timeout(pre_write_inst, &mut write_handle)?;
log(
format!("Failed to request song info over stream: {e}"),
LogState::Error,
|| write_handle.force_get_status)
{
write_handle.force_get_status = false;
+ let pre_write_inst = Instant::now();
let write_result = write_handle.stream.write(b"status\n");
if write_result.is_ok() {
write_handle.poll_state = PollState::Status;
} else if let Err(e) = write_result {
+ check_read_write_timeout(pre_write_inst, &mut write_handle)?;
log(
format!("Failed to request status over stream: {e}"),
LogState::Error,
let title = write_handle.current_song_filename.clone();
let art_data_length = write_handle.art_data.len();
if write_handle.can_get_album_art {
+ let pre_write_inst = Instant::now();
let write_result = write_handle
.stream
.write(format!("readpicture \"{title}\" {art_data_length}\n").as_bytes());
if write_result.is_ok() {
write_handle.poll_state = PollState::ReadPicture;
} else if let Err(e) = write_result {
+ check_read_write_timeout(pre_write_inst, &mut write_handle)?;
log(
format!("Failed to request album art: {e}"),
LogState::Error,
);
}
} else if write_handle.can_get_album_art_in_dir {
+ let pre_write_inst = Instant::now();
let write_result = write_handle
.stream
.write(format!("albumart \"{title}\" {art_data_length}\n").as_bytes());
if write_result.is_ok() {
write_handle.poll_state = PollState::ReadPictureInDir;
} else if let Err(e) = write_result {
+ check_read_write_timeout(pre_write_inst, &mut write_handle)?;
log(
format!("Failed to request album art in dir: {e}"),
LogState::Error,