aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/db.rs6
-rw-r--r--src/error.rs22
-rw-r--r--src/main.rs54
-rw-r--r--src/routes.rs (renamed from src/routes/mod.rs)0
-rw-r--r--src/routes/device.rs19
-rw-r--r--src/routes/start.rs78
-rw-r--r--src/routes/status.rs79
-rw-r--r--src/services.rs (renamed from src/services/mod.rs)0
-rw-r--r--src/services/ping.rs154
9 files changed, 251 insertions, 161 deletions
diff --git a/src/db.rs b/src/db.rs
index 489a000..47e907d 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -1,13 +1,13 @@
1use serde::Serialize; 1use serde::Serialize;
2use sqlx::{PgPool, postgres::PgPoolOptions}; 2use sqlx::{PgPool, postgres::PgPoolOptions, types::{ipnetwork::IpNetwork, mac_address::MacAddress}};
3use tracing::{debug, info}; 3use tracing::{debug, info};
4 4
5#[derive(Serialize, Debug)] 5#[derive(Serialize, Debug)]
6pub struct Device { 6pub struct Device {
7 pub id: String, 7 pub id: String,
8 pub mac: String, 8 pub mac: MacAddress,
9 pub broadcast_addr: String, 9 pub broadcast_addr: String,
10 pub ip: String, 10 pub ip: IpNetwork,
11 pub times: Option<Vec<i64>> 11 pub times: Option<Vec<i64>>
12} 12}
13 13
diff --git a/src/error.rs b/src/error.rs
index 63b214e..66a61f4 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -2,6 +2,8 @@ use axum::http::header::ToStrError;
2use axum::http::StatusCode; 2use axum::http::StatusCode;
3use axum::response::{IntoResponse, Response}; 3use axum::response::{IntoResponse, Response};
4use axum::Json; 4use axum::Json;
5use ::ipnetwork::IpNetworkError;
6use mac_address::MacParseError;
5use serde_json::json; 7use serde_json::json;
6use std::io; 8use std::io;
7use tracing::error; 9use tracing::error;
@@ -29,6 +31,18 @@ pub enum Error {
29 source: ToStrError, 31 source: ToStrError,
30 }, 32 },
31 33
34 #[error("string parse: {source}")]
35 IpParse {
36 #[from]
37 source: IpNetworkError,
38 },
39
40 #[error("mac parse: {source}")]
41 MacParse {
42 #[from]
43 source: MacParseError,
44 },
45
32 #[error("io: {source}")] 46 #[error("io: {source}")]
33 Io { 47 Io {
34 #[from] 48 #[from]
@@ -57,6 +71,14 @@ impl IntoResponse for Error {
57 error!("{source}"); 71 error!("{source}");
58 (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") 72 (StatusCode::INTERNAL_SERVER_ERROR, "Server Error")
59 } 73 }
74 Self::MacParse { source } => {
75 error!("{source}");
76 (StatusCode::INTERNAL_SERVER_ERROR, "Server Error")
77 }
78 Self::IpParse { source } => {
79 error!("{source}");
80 (StatusCode::INTERNAL_SERVER_ERROR, "Server Error")
81 }
60 }; 82 };
61 let body = Json(json!({ 83 let body = Json(json!({
62 "error": error_message, 84 "error": error_message,
diff --git a/src/main.rs b/src/main.rs
index 4ef129b..7d8c1da 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,42 +1,44 @@
1use std::env;
2use std::sync::Arc;
3use axum::{Router, routing::post};
4use axum::routing::{get, put};
5use dashmap::DashMap;
6use sqlx::PgPool;
7use time::util::local_offset;
8use tokio::sync::broadcast::{channel, Sender};
9use tracing::{info, level_filters::LevelFilter};
10use tracing_subscriber::{EnvFilter, fmt::{self, time::LocalTime}, prelude::*};
11use crate::config::Config; 1use crate::config::Config;
12use crate::db::init_db_pool; 2use crate::db::init_db_pool;
13use crate::routes::device; 3use crate::routes::device;
14use crate::routes::start::start; 4use crate::routes::start::start;
15use crate::routes::status::status; 5use crate::routes::status::status;
16use crate::services::ping::{BroadcastCommands, StatusMap}; 6use crate::services::ping::StatusMap;
7use axum::routing::{get, put};
8use axum::{routing::post, Router};
9use dashmap::DashMap;
10use services::ping::BroadcastCommand;
11use sqlx::PgPool;
12use tracing_subscriber::fmt::time::UtcTime;
13use std::env;
14use std::sync::Arc;
15use tokio::sync::broadcast::{channel, Sender};
16use tracing::{info, level_filters::LevelFilter};
17use tracing_subscriber::{
18 fmt,
19 prelude::*,
20 EnvFilter,
21};
17 22
18mod auth; 23mod auth;
19mod config; 24mod config;
20mod routes;
21mod wol;
22mod db; 25mod db;
23mod error; 26mod error;
27mod routes;
24mod services; 28mod services;
29mod wol;
25 30
26#[tokio::main] 31#[tokio::main]
27async fn main() -> color_eyre::eyre::Result<()> { 32async fn main() -> color_eyre::eyre::Result<()> {
28
29 color_eyre::install()?; 33 color_eyre::install()?;
34
30 35
31 unsafe { local_offset::set_soundness(local_offset::Soundness::Unsound); }
32 let time_format = 36 let time_format =
33 time::macros::format_description!("[year]-[month]-[day] [hour]:[minute]:[second]"); 37 time::macros::format_description!("[year]-[month]-[day] [hour]:[minute]:[second]");
34 let loc = LocalTime::new(time_format); 38 let loc = UtcTime::new(time_format);
35 39
36 tracing_subscriber::registry() 40 tracing_subscriber::registry()
37 .with(fmt::layer() 41 .with(fmt::layer().with_timer(loc))
38 .with_timer(loc)
39 )
40 .with( 42 .with(
41 EnvFilter::builder() 43 EnvFilter::builder()
42 .with_default_directive(LevelFilter::INFO.into()) 44 .with_default_directive(LevelFilter::INFO.into())
@@ -56,8 +58,13 @@ async fn main() -> color_eyre::eyre::Result<()> {
56 let (tx, _) = channel(32); 58 let (tx, _) = channel(32);
57 59
58 let ping_map: StatusMap = DashMap::new(); 60 let ping_map: StatusMap = DashMap::new();
59 61
60 let shared_state = Arc::new(AppState { db, config: config.clone(), ping_send: tx, ping_map }); 62 let shared_state = Arc::new(AppState {
63 db,
64 config: config.clone(),
65 ping_send: tx,
66 ping_map,
67 });
61 68
62 let app = Router::new() 69 let app = Router::new()
63 .route("/start", post(start)) 70 .route("/start", post(start))
@@ -69,8 +76,7 @@ async fn main() -> color_eyre::eyre::Result<()> {
69 76
70 let addr = config.serveraddr; 77 let addr = config.serveraddr;
71 info!("start server on {}", addr); 78 info!("start server on {}", addr);
72 let listener = tokio::net::TcpListener::bind(addr) 79 let listener = tokio::net::TcpListener::bind(addr).await?;
73 .await?;
74 axum::serve(listener, app).await?; 80 axum::serve(listener, app).await?;
75 81
76 Ok(()) 82 Ok(())
@@ -79,6 +85,6 @@ async fn main() -> color_eyre::eyre::Result<()> {
79pub struct AppState { 85pub struct AppState {
80 db: PgPool, 86 db: PgPool,
81 config: Config, 87 config: Config,
82 ping_send: Sender<BroadcastCommands>, 88 ping_send: Sender<BroadcastCommand>,
83 ping_map: StatusMap, 89 ping_map: StatusMap,
84} 90}
diff --git a/src/routes/mod.rs b/src/routes.rs
index d5ab0d6..d5ab0d6 100644
--- a/src/routes/mod.rs
+++ b/src/routes.rs
diff --git a/src/routes/device.rs b/src/routes/device.rs
index 5ca574a..2f0093d 100644
--- a/src/routes/device.rs
+++ b/src/routes/device.rs
@@ -4,9 +4,11 @@ use crate::error::Error;
4use axum::extract::State; 4use axum::extract::State;
5use axum::http::HeaderMap; 5use axum::http::HeaderMap;
6use axum::Json; 6use axum::Json;
7use mac_address::MacAddress;
7use serde::{Deserialize, Serialize}; 8use serde::{Deserialize, Serialize};
8use serde_json::{json, Value}; 9use serde_json::{json, Value};
9use std::sync::Arc; 10use sqlx::types::ipnetwork::IpNetwork;
11use std::{sync::Arc, str::FromStr};
10use tracing::{debug, info}; 12use tracing::{debug, info};
11 13
12pub async fn get( 14pub async fn get(
@@ -14,7 +16,7 @@ pub async fn get(
14 headers: HeaderMap, 16 headers: HeaderMap,
15 Json(payload): Json<GetDevicePayload>, 17 Json(payload): Json<GetDevicePayload>,
16) -> Result<Json<Value>, Error> { 18) -> Result<Json<Value>, Error> {
17 info!("add device {}", payload.id); 19 info!("get device {}", payload.id);
18 let secret = headers.get("authorization"); 20 let secret = headers.get("authorization");
19 let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success); 21 let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success);
20 if authorized { 22 if authorized {
@@ -52,18 +54,21 @@ pub async fn put(
52 "add device {} ({}, {}, {})", 54 "add device {} ({}, {}, {})",
53 payload.id, payload.mac, payload.broadcast_addr, payload.ip 55 payload.id, payload.mac, payload.broadcast_addr, payload.ip
54 ); 56 );
57
55 let secret = headers.get("authorization"); 58 let secret = headers.get("authorization");
56 let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success); 59 let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success);
57 if authorized { 60 if authorized {
61 let ip = IpNetwork::from_str(&payload.ip)?;
62 let mac = MacAddress::from_str(&payload.mac)?;
58 sqlx::query!( 63 sqlx::query!(
59 r#" 64 r#"
60 INSERT INTO devices (id, mac, broadcast_addr, ip) 65 INSERT INTO devices (id, mac, broadcast_addr, ip)
61 VALUES ($1, $2, $3, $4); 66 VALUES ($1, $2, $3, $4);
62 "#, 67 "#,
63 payload.id, 68 payload.id,
64 payload.mac, 69 mac,
65 payload.broadcast_addr, 70 payload.broadcast_addr,
66 payload.ip 71 ip
67 ) 72 )
68 .execute(&state.db) 73 .execute(&state.db)
69 .await?; 74 .await?;
@@ -99,6 +104,8 @@ pub async fn post(
99 let secret = headers.get("authorization"); 104 let secret = headers.get("authorization");
100 let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success); 105 let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success);
101 if authorized { 106 if authorized {
107 let ip = IpNetwork::from_str(&payload.ip)?;
108 let mac = MacAddress::from_str(&payload.mac)?;
102 let device = sqlx::query_as!( 109 let device = sqlx::query_as!(
103 Device, 110 Device,
104 r#" 111 r#"
@@ -106,9 +113,9 @@ pub async fn post(
106 SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4 113 SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4
107 RETURNING id, mac, broadcast_addr, ip, times; 114 RETURNING id, mac, broadcast_addr, ip, times;
108 "#, 115 "#,
109 payload.mac, 116 mac,
110 payload.broadcast_addr, 117 payload.broadcast_addr,
111 payload.ip, 118 ip,
112 payload.id 119 payload.id
113 ) 120 )
114 .fetch_one(&state.db) 121 .fetch_one(&state.db)
diff --git a/src/routes/start.rs b/src/routes/start.rs
index ec4f98f..4888325 100644
--- a/src/routes/start.rs
+++ b/src/routes/start.rs
@@ -12,7 +12,6 @@ use std::sync::Arc;
12use tracing::{debug, info}; 12use tracing::{debug, info};
13use uuid::Uuid; 13use uuid::Uuid;
14 14
15#[axum_macros::debug_handler]
16pub async fn start( 15pub async fn start(
17 State(state): State<Arc<crate::AppState>>, 16 State(state): State<Arc<crate::AppState>>,
18 headers: HeaderMap, 17 headers: HeaderMap,
@@ -41,45 +40,11 @@ pub async fn start(
41 let _ = send_packet( 40 let _ = send_packet(
42 bind_addr, 41 bind_addr,
43 &device.broadcast_addr, 42 &device.broadcast_addr,
44 &create_buffer(&device.mac)?, 43 &create_buffer(&device.mac.to_string())?,
45 )?; 44 )?;
46 let dev_id = device.id.clone(); 45 let dev_id = device.id.clone();
47 let uuid = if payload.ping.is_some_and(|ping| ping) { 46 let uuid = if payload.ping.is_some_and(|ping| ping) {
48 let mut uuid: Option<String> = None; 47 Some(setup_ping(state, device))
49 for (key, value) in state.ping_map.clone() {
50 if value.ip == device.ip {
51 debug!("service already exists");
52 uuid = Some(key);
53 break;
54 }
55 }
56 let uuid_gen = match uuid {
57 Some(u) => u,
58 None => Uuid::new_v4().to_string(),
59 };
60 let uuid_genc = uuid_gen.clone();
61
62 tokio::spawn(async move {
63 debug!("init ping service");
64 state.ping_map.insert(
65 uuid_gen.clone(),
66 PingValue {
67 ip: device.ip.clone(),
68 online: false,
69 },
70 );
71
72 crate::services::ping::spawn(
73 state.ping_send.clone(),
74 &state.config,
75 device,
76 uuid_gen.clone(),
77 &state.ping_map,
78 &state.db,
79 )
80 .await;
81 });
82 Some(uuid_genc)
83 } else { 48 } else {
84 None 49 None
85 }; 50 };
@@ -93,6 +58,45 @@ pub async fn start(
93 } 58 }
94} 59}
95 60
61fn setup_ping(state: Arc<crate::AppState>, device: Device) -> String {
62 let mut uuid: Option<String> = None;
63 for (key, value) in state.ping_map.clone() {
64 if value.ip == device.ip {
65 debug!("service already exists");
66 uuid = Some(key);
67 break;
68 }
69 }
70 let uuid_gen = match uuid {
71 Some(u) => u,
72 None => Uuid::new_v4().to_string(),
73 };
74 let uuid_ret = uuid_gen.clone();
75
76 debug!("init ping service");
77 state.ping_map.insert(
78 uuid_gen.clone(),
79 PingValue {
80 ip: device.ip,
81 online: false,
82 },
83 );
84
85 tokio::spawn(async move {
86 crate::services::ping::spawn(
87 state.ping_send.clone(),
88 &state.config,
89 device,
90 uuid_gen,
91 &state.ping_map,
92 &state.db,
93 )
94 .await;
95 });
96
97 uuid_ret
98}
99
96#[derive(Deserialize)] 100#[derive(Deserialize)]
97pub struct Payload { 101pub struct Payload {
98 id: String, 102 id: String,
diff --git a/src/routes/status.rs b/src/routes/status.rs
index 31ef996..0e25f7d 100644
--- a/src/routes/status.rs
+++ b/src/routes/status.rs
@@ -1,10 +1,79 @@
1use std::sync::Arc; 1use crate::services::ping::BroadcastCommand;
2use crate::AppState;
3use axum::extract::ws::{Message, WebSocket};
2use axum::extract::{State, WebSocketUpgrade}; 4use axum::extract::{State, WebSocketUpgrade};
3use axum::response::Response; 5use axum::response::Response;
4use crate::AppState; 6use sqlx::PgPool;
5use crate::services::ping::status_websocket; 7use std::sync::Arc;
8use tracing::{debug, trace};
6 9
7#[axum_macros::debug_handler]
8pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response { 10pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response {
9 ws.on_upgrade(move |socket| status_websocket(socket, state)) 11 ws.on_upgrade(move |socket| websocket(socket, state))
12}
13
14pub async fn websocket(mut socket: WebSocket, state: Arc<AppState>) {
15 trace!("wait for ws message (uuid)");
16 let msg = socket.recv().await;
17 let uuid = msg.unwrap().unwrap().into_text().unwrap();
18
19 trace!("Search for uuid: {}", uuid);
20
21 let eta = get_eta(&state.db).await;
22 let _ = socket
23 .send(Message::Text(format!("eta_{eta}_{uuid}")))
24 .await;
25
26 let device_exists = state.ping_map.contains_key(&uuid);
27 if device_exists {
28 let _ = socket
29 .send(receive_ping_broadcast(state.clone(), uuid).await)
30 .await;
31 } else {
32 debug!("didn't find any device");
33 let _ = socket.send(Message::Text(format!("notfound_{uuid}"))).await;
34 };
35
36 let _ = socket.close().await;
37}
38
39async fn receive_ping_broadcast(state: Arc<AppState>, uuid: String) -> Message {
40 let pm = state.ping_map.clone().into_read_only();
41 let device = pm.get(&uuid).expect("fatal error");
42 debug!("got device: {} (online: {})", device.ip, device.online);
43 if device.online {
44 debug!("already started");
45 Message::Text(BroadcastCommand::success(uuid).to_string())
46 } else {
47 loop {
48 trace!("wait for tx message");
49 let message = state
50 .ping_send
51 .subscribe()
52 .recv()
53 .await
54 .expect("fatal error");
55 trace!("got message {:?}", message);
56
57 if message.uuid != uuid {
58 continue;
59 }
60 trace!("message == uuid success");
61 return Message::Text(message.to_string());
62 }
63 }
64}
65
66async fn get_eta(db: &PgPool) -> i64 {
67 let query = sqlx::query!(r#"SELECT times FROM devices;"#)
68 .fetch_one(db)
69 .await
70 .unwrap();
71
72 let times = if let Some(times) = query.times {
73 times
74 } else {
75 vec![0]
76 };
77
78 times.iter().sum::<i64>() / i64::try_from(times.len()).unwrap()
10} 79}
diff --git a/src/services/mod.rs b/src/services.rs
index a766209..a766209 100644
--- a/src/services/mod.rs
+++ b/src/services.rs
diff --git a/src/services/ping.rs b/src/services/ping.rs
index 9b164c8..9191f86 100644
--- a/src/services/ping.rs
+++ b/src/services/ping.rs
@@ -1,59 +1,58 @@
1use std::str::FromStr; 1use crate::config::Config;
2use std::net::IpAddr; 2use crate::db::Device;
3use std::sync::Arc;
4
5use axum::extract::ws::WebSocket;
6use axum::extract::ws::Message;
7use dashmap::DashMap; 3use dashmap::DashMap;
4use ipnetwork::IpNetwork;
8use sqlx::PgPool; 5use sqlx::PgPool;
6use std::fmt::Display;
9use time::{Duration, Instant}; 7use time::{Duration, Instant};
10use tokio::sync::broadcast::Sender; 8use tokio::sync::broadcast::Sender;
11use tracing::{debug, error, trace}; 9use tracing::{debug, error, trace};
12use crate::AppState;
13use crate::config::Config;
14use crate::db::Device;
15 10
16pub type StatusMap = DashMap<String, Value>; 11pub type StatusMap = DashMap<String, Value>;
17 12
18#[derive(Debug, Clone)] 13#[derive(Debug, Clone)]
19pub struct Value { 14pub struct Value {
20 pub ip: String, 15 pub ip: IpNetwork,
21 pub online: bool 16 pub online: bool,
22} 17}
23 18
24pub async fn spawn(tx: Sender<BroadcastCommands>, config: &Config, device: Device, uuid: String, ping_map: &StatusMap, db: &PgPool) { 19pub async fn spawn(
20 tx: Sender<BroadcastCommand>,
21 config: &Config,
22 device: Device,
23 uuid: String,
24 ping_map: &StatusMap,
25 db: &PgPool,
26) {
25 let timer = Instant::now(); 27 let timer = Instant::now();
26 let payload = [0; 8]; 28 let payload = [0; 8];
27 29
28 let ping_ip = IpAddr::from_str(&device.ip).expect("bad ip"); 30 let mut msg: Option<BroadcastCommand> = None;
29
30 let mut msg: Option<BroadcastCommands> = None;
31 while msg.is_none() { 31 while msg.is_none() {
32 let ping = surge_ping::ping( 32 let ping = surge_ping::ping(device.ip.ip(), &payload).await;
33 ping_ip,
34 &payload
35 ).await;
36 33
37 if let Err(ping) = ping { 34 if let Err(ping) = ping {
38 let ping_timeout = matches!(ping, surge_ping::SurgeError::Timeout { .. }); 35 let ping_timeout = matches!(ping, surge_ping::SurgeError::Timeout { .. });
39 if !ping_timeout { 36 if !ping_timeout {
40 error!("{}", ping.to_string()); 37 error!("{}", ping.to_string());
41 msg = Some(BroadcastCommands::Error(uuid.clone())); 38 msg = Some(BroadcastCommand::error(uuid.clone()));
42 } 39 }
43 if timer.elapsed() >= Duration::minutes(config.pingtimeout) { 40 if timer.elapsed() >= Duration::minutes(config.pingtimeout) {
44 msg = Some(BroadcastCommands::Timeout(uuid.clone())); 41 msg = Some(BroadcastCommand::timeout(uuid.clone()));
45 } 42 }
46 } else { 43 } else {
47 let (_, duration) = ping.map_err(|err| error!("{}", err.to_string())).expect("fatal error"); 44 let (_, duration) = ping
45 .map_err(|err| error!("{}", err.to_string()))
46 .expect("fatal error");
48 debug!("ping took {:?}", duration); 47 debug!("ping took {:?}", duration);
49 msg = Some(BroadcastCommands::Success(uuid.clone())); 48 msg = Some(BroadcastCommand::success(uuid.clone()));
50 }; 49 };
51 } 50 }
52 51
53 let msg = msg.expect("fatal error"); 52 let msg = msg.expect("fatal error");
54 53
55 let _ = tx.send(msg.clone()); 54 let _ = tx.send(msg.clone());
56 if let BroadcastCommands::Success(..) = msg { 55 if let BroadcastCommands::Success = msg.command {
57 sqlx::query!( 56 sqlx::query!(
58 r#" 57 r#"
59 UPDATE devices 58 UPDATE devices
@@ -62,8 +61,17 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, config: &Config, device: Devic
62 "#, 61 "#,
63 timer.elapsed().whole_seconds(), 62 timer.elapsed().whole_seconds(),
64 device.id 63 device.id
65 ).execute(db).await.unwrap(); 64 )
66 ping_map.insert(uuid.clone(), Value { ip: device.ip.clone(), online: true }); 65 .execute(db)
66 .await
67 .unwrap();
68 ping_map.insert(
69 uuid.clone(),
70 Value {
71 ip: device.ip,
72 online: true,
73 },
74 );
67 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; 75 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
68 } 76 }
69 trace!("remove {} from ping_map", uuid); 77 trace!("remove {} from ping_map", uuid);
@@ -72,74 +80,48 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, config: &Config, device: Devic
72 80
73#[derive(Clone, Debug, PartialEq)] 81#[derive(Clone, Debug, PartialEq)]
74pub enum BroadcastCommands { 82pub enum BroadcastCommands {
75 Success(String), 83 Success,
76 Timeout(String), 84 Timeout,
77 Error(String), 85 Error,
78} 86}
79 87
80pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) { 88#[derive(Clone, Debug, PartialEq)]
81 trace!("wait for ws message (uuid)"); 89pub struct BroadcastCommand {
82 let msg = socket.recv().await; 90 pub uuid: String,
83 let uuid = msg.unwrap().unwrap().into_text().unwrap(); 91 pub command: BroadcastCommands,
84 92}
85 trace!("Search for uuid: {}", uuid);
86
87 let eta = get_eta(&state.db).await;
88 let _ = socket.send(Message::Text(format!("eta_{eta}_{uuid}"))).await;
89 93
90 let device_exists = state.ping_map.contains_key(&uuid); 94impl Display for BroadcastCommand {
91 if device_exists { 95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 let _ = socket.send(process_device(state.clone(), uuid).await).await; 96 let prefix = match self.command {
93 } else { 97 BroadcastCommands::Success => "start",
94 debug!("didn't find any device"); 98 BroadcastCommands::Timeout => "timeout",
95 let _ = socket.send(Message::Text(format!("notfound_{uuid}"))).await; 99 BroadcastCommands::Error => "error",
96 }; 100 };
97 101
98 let _ = socket.close().await; 102 f.write_str(format!("{prefix}_{}", self.uuid).as_str())
103 }
99} 104}
100 105
101async fn get_eta(db: &PgPool) -> i64 { 106impl BroadcastCommand {
102 let query = sqlx::query!( 107 pub fn success(uuid: String) -> Self {
103 r#"SELECT times FROM devices;"# 108 Self {
104 ).fetch_one(db).await.unwrap(); 109 uuid,
105 110 command: BroadcastCommands::Success,
106 let times = match query.times { 111 }
107 None => { vec![0] }, 112 }
108 Some(t) => t,
109 };
110 times.iter().sum::<i64>() / i64::try_from(times.len()).unwrap()
111 113
112} 114 pub fn timeout(uuid: String) -> Self {
115 Self {
116 uuid,
117 command: BroadcastCommands::Timeout,
118 }
119 }
113 120
114async fn process_device(state: Arc<AppState>, uuid: String) -> Message { 121 pub fn error(uuid: String) -> Self {
115 let pm = state.ping_map.clone().into_read_only(); 122 Self {
116 let device = pm.get(&uuid).expect("fatal error"); 123 uuid,
117 debug!("got device: {} (online: {})", device.ip, device.online); 124 command: BroadcastCommands::Error,
118 if device.online {
119 debug!("already started");
120 Message::Text(format!("start_{uuid}"))
121 } else {
122 loop {
123 trace!("wait for tx message");
124 let message = state.ping_send.subscribe().recv().await.expect("fatal error");
125 trace!("got message {:?}", message);
126 return match message {
127 BroadcastCommands::Success(msg_uuid) => {
128 if msg_uuid != uuid { continue; }
129 trace!("message == uuid success");
130 Message::Text(format!("start_{uuid}"))
131 },
132 BroadcastCommands::Timeout(msg_uuid) => {
133 if msg_uuid != uuid { continue; }
134 trace!("message == uuid timeout");
135 Message::Text(format!("timeout_{uuid}"))
136 },
137 BroadcastCommands::Error(msg_uuid) => {
138 if msg_uuid != uuid { continue; }
139 trace!("message == uuid error");
140 Message::Text(format!("error_{uuid}"))
141 }
142 }
143 } 125 }
144 } 126 }
145} 127}