summaryrefslogtreecommitdiff
path: root/src/services
diff options
context:
space:
mode:
authorFxQnLr <[email protected]>2023-11-08 12:44:17 +0100
committerFxQnLr <[email protected]>2023-11-08 12:44:17 +0100
commita197f4721d3b6e79c73f16c8db69ae9f3154acec (patch)
tree1d98e3464e8f05be11e1f4f99da904a9b05c5c2a /src/services
parent4f124e6ba636e6191c2960d96d0057f3061988fc (diff)
downloadwebol-a197f4721d3b6e79c73f16c8db69ae9f3154acec.tar
webol-a197f4721d3b6e79c73f16c8db69ae9f3154acec.tar.gz
webol-a197f4721d3b6e79c73f16c8db69ae9f3154acec.zip
add eta with average startup time
Diffstat (limited to 'src/services')
-rw-r--r--src/services/ping.rs37
1 files changed, 33 insertions, 4 deletions
diff --git a/src/services/ping.rs b/src/services/ping.rs
index 67acc1c..2bff61f 100644
--- a/src/services/ping.rs
+++ b/src/services/ping.rs
@@ -5,11 +5,13 @@ use std::sync::Arc;
5use axum::extract::{ws::WebSocket}; 5use axum::extract::{ws::WebSocket};
6use axum::extract::ws::Message; 6use axum::extract::ws::Message;
7use dashmap::DashMap; 7use dashmap::DashMap;
8use sqlx::PgPool;
8use time::{Duration, Instant}; 9use time::{Duration, Instant};
9use tokio::sync::broadcast::{Sender}; 10use tokio::sync::broadcast::{Sender};
10use tracing::{debug, error, trace}; 11use tracing::{debug, error, trace};
11use crate::AppState; 12use crate::AppState;
12use crate::config::SETTINGS; 13use crate::config::SETTINGS;
14use crate::db::Device;
13 15
14pub type PingMap = DashMap<String, PingValue>; 16pub type PingMap = DashMap<String, PingValue>;
15 17
@@ -19,11 +21,11 @@ pub struct PingValue {
19 pub online: bool 21 pub online: bool
20} 22}
21 23
22pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: &PingMap) { 24pub async fn spawn(tx: Sender<BroadcastCommands>, device: Device, uuid: String, ping_map: &PingMap, db: &PgPool) {
23 let timer = Instant::now(); 25 let timer = Instant::now();
24 let payload = [0; 8]; 26 let payload = [0; 8];
25 27
26 let ping_ip = IpAddr::from_str(&ip).expect("bad ip"); 28 let ping_ip = IpAddr::from_str(&device.ip).expect("bad ip");
27 29
28 let mut msg: Option<BroadcastCommands> = None; 30 let mut msg: Option<BroadcastCommands> = None;
29 while msg.is_none() { 31 while msg.is_none() {
@@ -52,7 +54,16 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping
52 54
53 let _ = tx.send(msg.clone()); 55 let _ = tx.send(msg.clone());
54 if let BroadcastCommands::Success(..) = msg { 56 if let BroadcastCommands::Success(..) = msg {
55 ping_map.insert(uuid.clone(), PingValue { ip: ip.clone(), online: true }); 57 sqlx::query!(
58 r#"
59 UPDATE devices
60 SET times = array_append(times, $1)
61 WHERE id = $2;
62 "#,
63 timer.elapsed().whole_seconds(),
64 device.id
65 ).execute(db).await.unwrap();
66 ping_map.insert(uuid.clone(), PingValue { ip: device.ip.clone(), online: true });
56 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; 67 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
57 } 68 }
58 trace!("remove {} from ping_map", uuid); 69 trace!("remove {} from ping_map", uuid);
@@ -71,7 +82,10 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) {
71 let msg = socket.recv().await; 82 let msg = socket.recv().await;
72 let uuid = msg.unwrap().unwrap().into_text().unwrap(); 83 let uuid = msg.unwrap().unwrap().into_text().unwrap();
73 84
74 trace!("Search for uuid: {:?}", uuid); 85 trace!("Search for uuid: {}", uuid);
86
87 let eta = get_eta(&state.db).await;
88 let _ = socket.send(Message::Text(format!("eta_{}_{}", eta, uuid))).await;
75 89
76 let device_exists = state.ping_map.contains_key(&uuid); 90 let device_exists = state.ping_map.contains_key(&uuid);
77 match device_exists { 91 match device_exists {
@@ -87,6 +101,21 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) {
87 let _ = socket.close().await; 101 let _ = socket.close().await;
88} 102}
89 103
104async fn get_eta(db: &PgPool) -> i64 {
105 let query = sqlx::query!(
106 r#"SELECT times FROM devices;"#
107 ).fetch_optional(db).await.unwrap();
108
109 match query {
110 None => { -1 },
111 Some(rec) => {
112 let times = rec.times.unwrap();
113 times.iter().sum::<i64>() / times.len() as i64
114 }
115 }
116
117}
118
90async fn process_device(state: Arc<AppState>, uuid: String) -> Message { 119async fn process_device(state: Arc<AppState>, uuid: String) -> Message {
91 let pm = state.ping_map.clone().into_read_only(); 120 let pm = state.ping_map.clone().into_read_only();
92 let device = pm.get(&uuid).expect("fatal error"); 121 let device = pm.get(&uuid).expect("fatal error");