From 2f9f18b80a9e2134f674f345e48a5f21de5efadd Mon Sep 17 00:00:00 2001 From: FxQnLr Date: Sun, 18 Feb 2024 21:16:46 +0100 Subject: Refactor stuff. Use Postgres Types --- src/services/ping.rs | 154 +++++++++++++++++++++++---------------------------- 1 file changed, 68 insertions(+), 86 deletions(-) (limited to 'src/services/ping.rs') diff --git a/src/services/ping.rs b/src/services/ping.rs index 9b164c8..9191f86 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -1,59 +1,58 @@ -use std::str::FromStr; -use std::net::IpAddr; -use std::sync::Arc; - -use axum::extract::ws::WebSocket; -use axum::extract::ws::Message; +use crate::config::Config; +use crate::db::Device; use dashmap::DashMap; +use ipnetwork::IpNetwork; use sqlx::PgPool; +use std::fmt::Display; use time::{Duration, Instant}; use tokio::sync::broadcast::Sender; use tracing::{debug, error, trace}; -use crate::AppState; -use crate::config::Config; -use crate::db::Device; pub type StatusMap = DashMap; #[derive(Debug, Clone)] pub struct Value { - pub ip: String, - pub online: bool + pub ip: IpNetwork, + pub online: bool, } -pub async fn spawn(tx: Sender, config: &Config, device: Device, uuid: String, ping_map: &StatusMap, db: &PgPool) { +pub async fn spawn( + tx: Sender, + config: &Config, + device: Device, + uuid: String, + ping_map: &StatusMap, + db: &PgPool, +) { let timer = Instant::now(); let payload = [0; 8]; - let ping_ip = IpAddr::from_str(&device.ip).expect("bad ip"); - - let mut msg: Option = None; + let mut msg: Option = None; while msg.is_none() { - let ping = surge_ping::ping( - ping_ip, - &payload - ).await; + let ping = surge_ping::ping(device.ip.ip(), &payload).await; if let Err(ping) = ping { let ping_timeout = matches!(ping, surge_ping::SurgeError::Timeout { .. }); if !ping_timeout { error!("{}", ping.to_string()); - msg = Some(BroadcastCommands::Error(uuid.clone())); + msg = Some(BroadcastCommand::error(uuid.clone())); } if timer.elapsed() >= Duration::minutes(config.pingtimeout) { - msg = Some(BroadcastCommands::Timeout(uuid.clone())); + msg = Some(BroadcastCommand::timeout(uuid.clone())); } } else { - let (_, duration) = ping.map_err(|err| error!("{}", err.to_string())).expect("fatal error"); + let (_, duration) = ping + .map_err(|err| error!("{}", err.to_string())) + .expect("fatal error"); debug!("ping took {:?}", duration); - msg = Some(BroadcastCommands::Success(uuid.clone())); + msg = Some(BroadcastCommand::success(uuid.clone())); }; } let msg = msg.expect("fatal error"); let _ = tx.send(msg.clone()); - if let BroadcastCommands::Success(..) = msg { + if let BroadcastCommands::Success = msg.command { sqlx::query!( r#" UPDATE devices @@ -62,8 +61,17 @@ pub async fn spawn(tx: Sender, config: &Config, device: Devic "#, timer.elapsed().whole_seconds(), device.id - ).execute(db).await.unwrap(); - ping_map.insert(uuid.clone(), Value { ip: device.ip.clone(), online: true }); + ) + .execute(db) + .await + .unwrap(); + ping_map.insert( + uuid.clone(), + Value { + ip: device.ip, + online: true, + }, + ); tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; } trace!("remove {} from ping_map", uuid); @@ -72,74 +80,48 @@ pub async fn spawn(tx: Sender, config: &Config, device: Devic #[derive(Clone, Debug, PartialEq)] pub enum BroadcastCommands { - Success(String), - Timeout(String), - Error(String), + Success, + Timeout, + Error, } -pub async fn status_websocket(mut socket: WebSocket, state: Arc) { - trace!("wait for ws message (uuid)"); - let msg = socket.recv().await; - let uuid = msg.unwrap().unwrap().into_text().unwrap(); - - trace!("Search for uuid: {}", uuid); - - let eta = get_eta(&state.db).await; - let _ = socket.send(Message::Text(format!("eta_{eta}_{uuid}"))).await; +#[derive(Clone, Debug, PartialEq)] +pub struct BroadcastCommand { + pub uuid: String, + pub command: BroadcastCommands, +} - let device_exists = state.ping_map.contains_key(&uuid); - if device_exists { - let _ = socket.send(process_device(state.clone(), uuid).await).await; - } else { - debug!("didn't find any device"); - let _ = socket.send(Message::Text(format!("notfound_{uuid}"))).await; - }; +impl Display for BroadcastCommand { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let prefix = match self.command { + BroadcastCommands::Success => "start", + BroadcastCommands::Timeout => "timeout", + BroadcastCommands::Error => "error", + }; - let _ = socket.close().await; + f.write_str(format!("{prefix}_{}", self.uuid).as_str()) + } } -async fn get_eta(db: &PgPool) -> i64 { - let query = sqlx::query!( - r#"SELECT times FROM devices;"# - ).fetch_one(db).await.unwrap(); - - let times = match query.times { - None => { vec![0] }, - Some(t) => t, - }; - times.iter().sum::() / i64::try_from(times.len()).unwrap() +impl BroadcastCommand { + pub fn success(uuid: String) -> Self { + Self { + uuid, + command: BroadcastCommands::Success, + } + } -} + pub fn timeout(uuid: String) -> Self { + Self { + uuid, + command: BroadcastCommands::Timeout, + } + } -async fn process_device(state: Arc, uuid: String) -> Message { - let pm = state.ping_map.clone().into_read_only(); - let device = pm.get(&uuid).expect("fatal error"); - debug!("got device: {} (online: {})", device.ip, device.online); - if device.online { - debug!("already started"); - Message::Text(format!("start_{uuid}")) - } else { - loop { - trace!("wait for tx message"); - let message = state.ping_send.subscribe().recv().await.expect("fatal error"); - trace!("got message {:?}", message); - return match message { - BroadcastCommands::Success(msg_uuid) => { - if msg_uuid != uuid { continue; } - trace!("message == uuid success"); - Message::Text(format!("start_{uuid}")) - }, - BroadcastCommands::Timeout(msg_uuid) => { - if msg_uuid != uuid { continue; } - trace!("message == uuid timeout"); - Message::Text(format!("timeout_{uuid}")) - }, - BroadcastCommands::Error(msg_uuid) => { - if msg_uuid != uuid { continue; } - trace!("message == uuid error"); - Message::Text(format!("error_{uuid}")) - } - } + pub fn error(uuid: String) -> Self { + Self { + uuid, + command: BroadcastCommands::Error, } } } -- cgit v1.2.3