diff options
Diffstat (limited to 'src/services')
-rw-r--r-- | src/services/ping.rs | 84 |
1 files changed, 39 insertions, 45 deletions
diff --git a/src/services/ping.rs b/src/services/ping.rs index c3bdced..9b164c8 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs | |||
@@ -2,26 +2,26 @@ use std::str::FromStr; | |||
2 | use std::net::IpAddr; | 2 | use std::net::IpAddr; |
3 | use std::sync::Arc; | 3 | use std::sync::Arc; |
4 | 4 | ||
5 | use axum::extract::{ws::WebSocket}; | 5 | use axum::extract::ws::WebSocket; |
6 | use axum::extract::ws::Message; | 6 | use axum::extract::ws::Message; |
7 | use dashmap::DashMap; | 7 | use dashmap::DashMap; |
8 | use sqlx::PgPool; | 8 | use sqlx::PgPool; |
9 | use time::{Duration, Instant}; | 9 | use time::{Duration, Instant}; |
10 | use tokio::sync::broadcast::{Sender}; | 10 | use tokio::sync::broadcast::Sender; |
11 | use tracing::{debug, error, trace}; | 11 | use tracing::{debug, error, trace}; |
12 | use crate::AppState; | 12 | use crate::AppState; |
13 | use crate::config::SETTINGS; | 13 | use crate::config::Config; |
14 | use crate::db::Device; | 14 | use crate::db::Device; |
15 | 15 | ||
16 | pub type PingMap = DashMap<String, PingValue>; | 16 | pub type StatusMap = DashMap<String, Value>; |
17 | 17 | ||
18 | #[derive(Debug, Clone)] | 18 | #[derive(Debug, Clone)] |
19 | pub struct PingValue { | 19 | pub struct Value { |
20 | pub ip: String, | 20 | pub ip: String, |
21 | pub online: bool | 21 | pub online: bool |
22 | } | 22 | } |
23 | 23 | ||
24 | pub async fn spawn(tx: Sender<BroadcastCommands>, device: Device, uuid: String, ping_map: &PingMap, db: &PgPool) { | 24 | pub async fn spawn(tx: Sender<BroadcastCommands>, config: &Config, device: Device, uuid: String, ping_map: &StatusMap, db: &PgPool) { |
25 | let timer = Instant::now(); | 25 | let timer = Instant::now(); |
26 | let payload = [0; 8]; | 26 | let payload = [0; 8]; |
27 | 27 | ||
@@ -40,7 +40,7 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, device: Device, uuid: String, | |||
40 | error!("{}", ping.to_string()); | 40 | error!("{}", ping.to_string()); |
41 | msg = Some(BroadcastCommands::Error(uuid.clone())); | 41 | msg = Some(BroadcastCommands::Error(uuid.clone())); |
42 | } | 42 | } |
43 | if timer.elapsed() >= Duration::minutes(SETTINGS.get_int("pingtimeout").unwrap_or(10)) { | 43 | if timer.elapsed() >= Duration::minutes(config.pingtimeout) { |
44 | msg = Some(BroadcastCommands::Timeout(uuid.clone())); | 44 | msg = Some(BroadcastCommands::Timeout(uuid.clone())); |
45 | } | 45 | } |
46 | } else { | 46 | } else { |
@@ -63,7 +63,7 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, device: Device, uuid: String, | |||
63 | timer.elapsed().whole_seconds(), | 63 | timer.elapsed().whole_seconds(), |
64 | device.id | 64 | device.id |
65 | ).execute(db).await.unwrap(); | 65 | ).execute(db).await.unwrap(); |
66 | ping_map.insert(uuid.clone(), PingValue { ip: device.ip.clone(), online: true }); | 66 | ping_map.insert(uuid.clone(), Value { ip: device.ip.clone(), online: true }); |
67 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; | 67 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; |
68 | } | 68 | } |
69 | trace!("remove {} from ping_map", uuid); | 69 | trace!("remove {} from ping_map", uuid); |
@@ -85,17 +85,14 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) { | |||
85 | trace!("Search for uuid: {}", uuid); | 85 | trace!("Search for uuid: {}", uuid); |
86 | 86 | ||
87 | let eta = get_eta(&state.db).await; | 87 | let eta = get_eta(&state.db).await; |
88 | let _ = socket.send(Message::Text(format!("eta_{}_{}", eta, uuid))).await; | 88 | let _ = socket.send(Message::Text(format!("eta_{eta}_{uuid}"))).await; |
89 | 89 | ||
90 | let device_exists = state.ping_map.contains_key(&uuid); | 90 | let device_exists = state.ping_map.contains_key(&uuid); |
91 | match device_exists { | 91 | if device_exists { |
92 | true => { | 92 | let _ = socket.send(process_device(state.clone(), uuid).await).await; |
93 | let _ = socket.send(process_device(state.clone(), uuid).await).await; | 93 | } else { |
94 | }, | 94 | debug!("didn't find any device"); |
95 | false => { | 95 | let _ = socket.send(Message::Text(format!("notfound_{uuid}"))).await; |
96 | debug!("didn't find any device"); | ||
97 | let _ = socket.send(Message::Text(format!("notfound_{}", uuid))).await; | ||
98 | }, | ||
99 | }; | 96 | }; |
100 | 97 | ||
101 | let _ = socket.close().await; | 98 | let _ = socket.close().await; |
@@ -110,7 +107,7 @@ async fn get_eta(db: &PgPool) -> i64 { | |||
110 | None => { vec![0] }, | 107 | None => { vec![0] }, |
111 | Some(t) => t, | 108 | Some(t) => t, |
112 | }; | 109 | }; |
113 | times.iter().sum::<i64>() / times.len() as i64 | 110 | times.iter().sum::<i64>() / i64::try_from(times.len()).unwrap() |
114 | 111 | ||
115 | } | 112 | } |
116 | 113 | ||
@@ -118,34 +115,31 @@ async fn process_device(state: Arc<AppState>, uuid: String) -> Message { | |||
118 | let pm = state.ping_map.clone().into_read_only(); | 115 | let pm = state.ping_map.clone().into_read_only(); |
119 | let device = pm.get(&uuid).expect("fatal error"); | 116 | let device = pm.get(&uuid).expect("fatal error"); |
120 | debug!("got device: {} (online: {})", device.ip, device.online); | 117 | debug!("got device: {} (online: {})", device.ip, device.online); |
121 | match device.online { | 118 | if device.online { |
122 | true => { | 119 | debug!("already started"); |
123 | debug!("already started"); | 120 | Message::Text(format!("start_{uuid}")) |
124 | Message::Text(format!("start_{}", uuid)) | 121 | } else { |
125 | }, | 122 | loop { |
126 | false => { | 123 | trace!("wait for tx message"); |
127 | loop{ | 124 | let message = state.ping_send.subscribe().recv().await.expect("fatal error"); |
128 | trace!("wait for tx message"); | 125 | trace!("got message {:?}", message); |
129 | let message = state.ping_send.subscribe().recv().await.expect("fatal error"); | 126 | return match message { |
130 | trace!("got message {:?}", message); | 127 | BroadcastCommands::Success(msg_uuid) => { |
131 | return match message { | 128 | if msg_uuid != uuid { continue; } |
132 | BroadcastCommands::Success(msg_uuid) => { | 129 | trace!("message == uuid success"); |
133 | if msg_uuid != uuid { continue; } | 130 | Message::Text(format!("start_{uuid}")) |
134 | trace!("message == uuid success"); | 131 | }, |
135 | Message::Text(format!("start_{}", uuid)) | 132 | BroadcastCommands::Timeout(msg_uuid) => { |
136 | }, | 133 | if msg_uuid != uuid { continue; } |
137 | BroadcastCommands::Timeout(msg_uuid) => { | 134 | trace!("message == uuid timeout"); |
138 | if msg_uuid != uuid { continue; } | 135 | Message::Text(format!("timeout_{uuid}")) |
139 | trace!("message == uuid timeout"); | 136 | }, |
140 | Message::Text(format!("timeout_{}", uuid)) | 137 | BroadcastCommands::Error(msg_uuid) => { |
141 | }, | 138 | if msg_uuid != uuid { continue; } |
142 | BroadcastCommands::Error(msg_uuid) => { | 139 | trace!("message == uuid error"); |
143 | if msg_uuid != uuid { continue; } | 140 | Message::Text(format!("error_{uuid}")) |
144 | trace!("message == uuid error"); | ||
145 | Message::Text(format!("error_{}", uuid)) | ||
146 | } | ||
147 | } | 141 | } |
148 | } | 142 | } |
149 | } | 143 | } |
150 | } | 144 | } |
151 | } \ No newline at end of file | 145 | } |