From a197f4721d3b6e79c73f16c8db69ae9f3154acec Mon Sep 17 00:00:00 2001 From: FxQnLr Date: Wed, 8 Nov 2023 12:44:17 +0100 Subject: add eta with average startup time --- src/services/ping.rs | 37 +++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) (limited to 'src/services') diff --git a/src/services/ping.rs b/src/services/ping.rs index 67acc1c..2bff61f 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -5,11 +5,13 @@ use std::sync::Arc; use axum::extract::{ws::WebSocket}; use axum::extract::ws::Message; use dashmap::DashMap; +use sqlx::PgPool; use time::{Duration, Instant}; use tokio::sync::broadcast::{Sender}; use tracing::{debug, error, trace}; use crate::AppState; use crate::config::SETTINGS; +use crate::db::Device; pub type PingMap = DashMap; @@ -19,11 +21,11 @@ pub struct PingValue { pub online: bool } -pub async fn spawn(tx: Sender, ip: String, uuid: String, ping_map: &PingMap) { +pub async fn spawn(tx: Sender, device: Device, uuid: String, ping_map: &PingMap, db: &PgPool) { let timer = Instant::now(); let payload = [0; 8]; - let ping_ip = IpAddr::from_str(&ip).expect("bad ip"); + let ping_ip = IpAddr::from_str(&device.ip).expect("bad ip"); let mut msg: Option = None; while msg.is_none() { @@ -52,7 +54,16 @@ pub async fn spawn(tx: Sender, ip: String, uuid: String, ping let _ = tx.send(msg.clone()); if let BroadcastCommands::Success(..) = msg { - ping_map.insert(uuid.clone(), PingValue { ip: ip.clone(), online: true }); + sqlx::query!( + r#" + UPDATE devices + SET times = array_append(times, $1) + WHERE id = $2; + "#, + timer.elapsed().whole_seconds(), + device.id + ).execute(db).await.unwrap(); + ping_map.insert(uuid.clone(), PingValue { ip: device.ip.clone(), online: true }); tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; } trace!("remove {} from ping_map", uuid); @@ -71,7 +82,10 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc) { let msg = socket.recv().await; let uuid = msg.unwrap().unwrap().into_text().unwrap(); - trace!("Search for uuid: {:?}", uuid); + trace!("Search for uuid: {}", uuid); + + let eta = get_eta(&state.db).await; + let _ = socket.send(Message::Text(format!("eta_{}_{}", eta, uuid))).await; let device_exists = state.ping_map.contains_key(&uuid); match device_exists { @@ -87,6 +101,21 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc) { let _ = socket.close().await; } +async fn get_eta(db: &PgPool) -> i64 { + let query = sqlx::query!( + r#"SELECT times FROM devices;"# + ).fetch_optional(db).await.unwrap(); + + match query { + None => { -1 }, + Some(rec) => { + let times = rec.times.unwrap(); + times.iter().sum::() / times.len() as i64 + } + } + +} + async fn process_device(state: Arc, uuid: String) -> Message { let pm = state.ping_map.clone().into_read_only(); let device = pm.get(&uuid).expect("fatal error"); -- cgit v1.2.3 From 075b0bdc47713e303f9954556fa4b4bb472b441a Mon Sep 17 00:00:00 2001 From: FxQnLr Date: Thu, 16 Nov 2023 14:17:03 +0100 Subject: check if already runs and bug fix --- src/routes/start.rs | 15 +++++++++++++-- src/services/ping.rs | 14 ++++++-------- 2 files changed, 19 insertions(+), 10 deletions(-) (limited to 'src/services') diff --git a/src/routes/start.rs b/src/routes/start.rs index 401ae97..1555db3 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs @@ -42,9 +42,20 @@ pub async fn start(State(state): State>, headers: HeaderMap )?; let dev_id = device.id.clone(); let uuid = if payload.ping.is_some_and(|ping| ping) { - let uuid_gen = Uuid::new_v4().to_string(); + let mut uuid: Option = None; + for (key, value) in state.ping_map.clone() { + if value.ip == device.ip { + debug!("service already exists"); + uuid = Some(key); + break; + } + }; + let uuid_gen = match uuid { + Some(u) => u, + None => Uuid::new_v4().to_string(), + }; let uuid_genc = uuid_gen.clone(); - // TODO: Check if service already runs + tokio::spawn(async move { debug!("init ping service"); state.ping_map.insert(uuid_gen.clone(), PingValue { ip: device.ip.clone(), online: false }); diff --git a/src/services/ping.rs b/src/services/ping.rs index 2bff61f..c3bdced 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -104,15 +104,13 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc) { async fn get_eta(db: &PgPool) -> i64 { let query = sqlx::query!( r#"SELECT times FROM devices;"# - ).fetch_optional(db).await.unwrap(); + ).fetch_one(db).await.unwrap(); - match query { - None => { -1 }, - Some(rec) => { - let times = rec.times.unwrap(); - times.iter().sum::() / times.len() as i64 - } - } + let times = match query.times { + None => { vec![0] }, + Some(t) => t, + }; + times.iter().sum::() / times.len() as i64 } -- cgit v1.2.3