diff options
Diffstat (limited to 'src/routes/status.rs')
-rw-r--r-- | src/routes/status.rs | 79 |
1 files changed, 74 insertions, 5 deletions
diff --git a/src/routes/status.rs b/src/routes/status.rs index 31ef996..0e25f7d 100644 --- a/src/routes/status.rs +++ b/src/routes/status.rs | |||
@@ -1,10 +1,79 @@ | |||
1 | use std::sync::Arc; | 1 | use crate::services::ping::BroadcastCommand; |
2 | use crate::AppState; | ||
3 | use axum::extract::ws::{Message, WebSocket}; | ||
2 | use axum::extract::{State, WebSocketUpgrade}; | 4 | use axum::extract::{State, WebSocketUpgrade}; |
3 | use axum::response::Response; | 5 | use axum::response::Response; |
4 | use crate::AppState; | 6 | use sqlx::PgPool; |
5 | use crate::services::ping::status_websocket; | 7 | use std::sync::Arc; |
8 | use tracing::{debug, trace}; | ||
6 | 9 | ||
7 | #[axum_macros::debug_handler] | ||
8 | pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response { | 10 | pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response { |
9 | ws.on_upgrade(move |socket| status_websocket(socket, state)) | 11 | ws.on_upgrade(move |socket| websocket(socket, state)) |
12 | } | ||
13 | |||
14 | pub async fn websocket(mut socket: WebSocket, state: Arc<AppState>) { | ||
15 | trace!("wait for ws message (uuid)"); | ||
16 | let msg = socket.recv().await; | ||
17 | let uuid = msg.unwrap().unwrap().into_text().unwrap(); | ||
18 | |||
19 | trace!("Search for uuid: {}", uuid); | ||
20 | |||
21 | let eta = get_eta(&state.db).await; | ||
22 | let _ = socket | ||
23 | .send(Message::Text(format!("eta_{eta}_{uuid}"))) | ||
24 | .await; | ||
25 | |||
26 | let device_exists = state.ping_map.contains_key(&uuid); | ||
27 | if device_exists { | ||
28 | let _ = socket | ||
29 | .send(receive_ping_broadcast(state.clone(), uuid).await) | ||
30 | .await; | ||
31 | } else { | ||
32 | debug!("didn't find any device"); | ||
33 | let _ = socket.send(Message::Text(format!("notfound_{uuid}"))).await; | ||
34 | }; | ||
35 | |||
36 | let _ = socket.close().await; | ||
37 | } | ||
38 | |||
39 | async fn receive_ping_broadcast(state: Arc<AppState>, uuid: String) -> Message { | ||
40 | let pm = state.ping_map.clone().into_read_only(); | ||
41 | let device = pm.get(&uuid).expect("fatal error"); | ||
42 | debug!("got device: {} (online: {})", device.ip, device.online); | ||
43 | if device.online { | ||
44 | debug!("already started"); | ||
45 | Message::Text(BroadcastCommand::success(uuid).to_string()) | ||
46 | } else { | ||
47 | loop { | ||
48 | trace!("wait for tx message"); | ||
49 | let message = state | ||
50 | .ping_send | ||
51 | .subscribe() | ||
52 | .recv() | ||
53 | .await | ||
54 | .expect("fatal error"); | ||
55 | trace!("got message {:?}", message); | ||
56 | |||
57 | if message.uuid != uuid { | ||
58 | continue; | ||
59 | } | ||
60 | trace!("message == uuid success"); | ||
61 | return Message::Text(message.to_string()); | ||
62 | } | ||
63 | } | ||
64 | } | ||
65 | |||
66 | async fn get_eta(db: &PgPool) -> i64 { | ||
67 | let query = sqlx::query!(r#"SELECT times FROM devices;"#) | ||
68 | .fetch_one(db) | ||
69 | .await | ||
70 | .unwrap(); | ||
71 | |||
72 | let times = if let Some(times) = query.times { | ||
73 | times | ||
74 | } else { | ||
75 | vec![0] | ||
76 | }; | ||
77 | |||
78 | times.iter().sum::<i64>() / i64::try_from(times.len()).unwrap() | ||
10 | } | 79 | } |