diff options
-rw-r--r-- | src/error.rs | 20 | ||||
-rw-r--r-- | src/main.rs | 5 | ||||
-rw-r--r-- | src/routes/start.rs | 5 | ||||
-rw-r--r-- | src/routes/status.rs | 2 | ||||
-rw-r--r-- | src/services/ping.rs | 68 |
5 files changed, 60 insertions, 40 deletions
diff --git a/src/error.rs b/src/error.rs index f143ee9..1592a78 100644 --- a/src/error.rs +++ b/src/error.rs | |||
@@ -10,19 +10,20 @@ use crate::auth::AuthError; | |||
10 | #[derive(Debug)] | 10 | #[derive(Debug)] |
11 | pub enum WebolError { | 11 | pub enum WebolError { |
12 | Generic, | 12 | Generic, |
13 | // User(UserError), | ||
13 | Auth(AuthError), | 14 | Auth(AuthError), |
14 | Ping(surge_ping::SurgeError), | 15 | Ping(surge_ping::SurgeError), |
15 | DB(sqlx::Error), | 16 | DB(sqlx::Error), |
16 | IpParse(<std::net::IpAddr as std::str::FromStr>::Err), | 17 | IpParse(<std::net::IpAddr as std::str::FromStr>::Err), |
17 | BufferParse(std::num::ParseIntError), | 18 | BufferParse(std::num::ParseIntError), |
18 | Broadcast(io::Error), | 19 | Broadcast(io::Error), |
19 | Axum(axum::Error) | ||
20 | } | 20 | } |
21 | 21 | ||
22 | impl IntoResponse for WebolError { | 22 | impl IntoResponse for WebolError { |
23 | fn into_response(self) -> Response { | 23 | fn into_response(self) -> Response { |
24 | let (status, error_message) = match self { | 24 | let (status, error_message) = match self { |
25 | Self::Auth(err) => err.get(), | 25 | Self::Auth(err) => err.get(), |
26 | // Self::User(err) => err.get(), | ||
26 | Self::Generic => (StatusCode::INTERNAL_SERVER_ERROR, ""), | 27 | Self::Generic => (StatusCode::INTERNAL_SERVER_ERROR, ""), |
27 | Self::Ping(err) => { | 28 | Self::Ping(err) => { |
28 | error!("Ping: {}", err.source().unwrap()); | 29 | error!("Ping: {}", err.source().unwrap()); |
@@ -44,10 +45,6 @@ impl IntoResponse for WebolError { | |||
44 | error!("server error: {}", err.to_string()); | 45 | error!("server error: {}", err.to_string()); |
45 | (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") | 46 | (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") |
46 | }, | 47 | }, |
47 | Self::Axum(err) => { | ||
48 | error!("server error: {}", err.to_string()); | ||
49 | (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") | ||
50 | }, | ||
51 | }; | 48 | }; |
52 | let body = Json(json!({ | 49 | let body = Json(json!({ |
53 | "error": error_message, | 50 | "error": error_message, |
@@ -55,3 +52,16 @@ impl IntoResponse for WebolError { | |||
55 | (status, body).into_response() | 52 | (status, body).into_response() |
56 | } | 53 | } |
57 | } | 54 | } |
55 | |||
56 | // #[derive(Debug)] | ||
57 | // pub enum UserError { | ||
58 | // UnknownUUID, | ||
59 | // } | ||
60 | // | ||
61 | // impl UserError { | ||
62 | // pub fn get(self) -> (StatusCode, &'static str) { | ||
63 | // match self { | ||
64 | // Self::UnknownUUID => (StatusCode::UNPROCESSABLE_ENTITY, "Unknown UUID"), | ||
65 | // } | ||
66 | // } | ||
67 | // } | ||
diff --git a/src/main.rs b/src/main.rs index 854b59d..545d8fe 100644 --- a/src/main.rs +++ b/src/main.rs | |||
@@ -14,6 +14,7 @@ use crate::db::init_db_pool; | |||
14 | use crate::routes::device::{get_device, post_device, put_device}; | 14 | use crate::routes::device::{get_device, post_device, put_device}; |
15 | use crate::routes::start::start; | 15 | use crate::routes::start::start; |
16 | use crate::routes::status::status; | 16 | use crate::routes::status::status; |
17 | use crate::services::ping::{BroadcastCommands, PingMap}; | ||
17 | 18 | ||
18 | mod auth; | 19 | mod auth; |
19 | mod config; | 20 | mod config; |
@@ -72,6 +73,6 @@ async fn main() { | |||
72 | 73 | ||
73 | pub struct AppState { | 74 | pub struct AppState { |
74 | db: PgPool, | 75 | db: PgPool, |
75 | ping_send: Sender<String>, | 76 | ping_send: Sender<BroadcastCommands>, |
76 | ping_map: Arc<Mutex<HashMap<String, (String, bool)>>>, | 77 | ping_map: PingMap, |
77 | } \ No newline at end of file | 78 | } \ No newline at end of file |
diff --git a/src/routes/start.rs b/src/routes/start.rs index 45e7ec8..b1c8a73 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs | |||
@@ -44,15 +44,16 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap | |||
44 | let uuid = if payload.ping.is_some_and(|ping| ping) { | 44 | let uuid = if payload.ping.is_some_and(|ping| ping) { |
45 | let uuid_gen = Uuid::new_v4().to_string(); | 45 | let uuid_gen = Uuid::new_v4().to_string(); |
46 | let uuid_genc = uuid_gen.clone(); | 46 | let uuid_genc = uuid_gen.clone(); |
47 | let uuid_gencc = uuid_gen.clone(); | ||
47 | tokio::spawn(async move{ | 48 | tokio::spawn(async move{ |
48 | debug!("Init ping service"); | 49 | debug!("Init ping service"); |
49 | state.ping_map.lock().await.insert(uuid_gen, ("192.168.178.94".to_string(), false)); | 50 | state.ping_map.lock().await.insert(uuid_gen, ("192.168.178.94".to_string(), false)); |
50 | 51 | ||
51 | warn!("{:?}", state.ping_map); | 52 | warn!("{:?}", state.ping_map); |
52 | 53 | ||
53 | crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string()).await; | 54 | crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string(), uuid_genc.clone(), state.ping_map.clone()).await |
54 | }); | 55 | }); |
55 | Some(uuid_genc) | 56 | Some(uuid_gencc) |
56 | } else { None }; | 57 | } else { None }; |
57 | Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) | 58 | Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) |
58 | } else { | 59 | } else { |
diff --git a/src/routes/status.rs b/src/routes/status.rs index cdecf6a..4a5ec67 100644 --- a/src/routes/status.rs +++ b/src/routes/status.rs | |||
@@ -1,12 +1,10 @@ | |||
1 | use std::sync::Arc; | 1 | use std::sync::Arc; |
2 | use axum::extract::{State, WebSocketUpgrade}; | 2 | use axum::extract::{State, WebSocketUpgrade}; |
3 | use axum::response::Response; | 3 | use axum::response::Response; |
4 | use serde::Deserialize; | ||
5 | use crate::AppState; | 4 | use crate::AppState; |
6 | use crate::services::ping::status_websocket; | 5 | use crate::services::ping::status_websocket; |
7 | 6 | ||
8 | #[axum_macros::debug_handler] | 7 | #[axum_macros::debug_handler] |
9 | pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response { | 8 | pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response { |
10 | // TODO: remove unwrap | ||
11 | ws.on_upgrade(move |socket| status_websocket(socket, state.ping_send.clone(), state.ping_map.clone())) | 9 | ws.on_upgrade(move |socket| status_websocket(socket, state.ping_send.clone(), state.ping_map.clone())) |
12 | } \ No newline at end of file | 10 | } \ No newline at end of file |
diff --git a/src/services/ping.rs b/src/services/ping.rs index e3d465d..6835fc0 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs | |||
@@ -3,16 +3,19 @@ use std::collections::HashMap; | |||
3 | use std::sync::Arc; | 3 | use std::sync::Arc; |
4 | 4 | ||
5 | use axum::extract::{ws::WebSocket}; | 5 | use axum::extract::{ws::WebSocket}; |
6 | use axum::extract::ws::Message; | 6 | use axum::extract::ws::{CloseFrame, Message}; |
7 | use tokio::sync::broadcast::{Sender}; | 7 | use tokio::sync::broadcast::{Sender}; |
8 | use tokio::sync::Mutex; | 8 | use tokio::sync::Mutex; |
9 | use tracing::{debug, error, trace, warn}; | 9 | use tracing::{debug, trace, warn}; |
10 | 10 | ||
11 | use crate::error::WebolError; | 11 | use crate::error::WebolError; |
12 | 12 | ||
13 | pub async fn spawn(tx: Sender<String>, ip: String) -> Result<(), WebolError> { | 13 | pub type PingMap = Arc<Mutex<HashMap<String, (String, bool)>>>; |
14 | |||
15 | pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: PingMap) -> Result<(), WebolError> { | ||
14 | let payload = [0; 8]; | 16 | let payload = [0; 8]; |
15 | 17 | ||
18 | // TODO: Better while | ||
16 | let mut cont = true; | 19 | let mut cont = true; |
17 | while cont { | 20 | while cont { |
18 | let ping = surge_ping::ping( | 21 | let ping = surge_ping::ping( |
@@ -22,40 +25,44 @@ pub async fn spawn(tx: Sender<String>, ip: String) -> Result<(), WebolError> { | |||
22 | 25 | ||
23 | if let Err(ping) = ping { | 26 | if let Err(ping) = ping { |
24 | cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); | 27 | cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); |
25 | |||
26 | // debug!("{}", cont); | ||
27 | |||
28 | if !cont { | 28 | if !cont { |
29 | return Err(ping).map_err(WebolError::Ping) | 29 | return Err(ping).map_err(WebolError::Ping) |
30 | } | 30 | } |
31 | |||
32 | } else { | 31 | } else { |
33 | let (_, duration) = ping.unwrap(); | 32 | let (_, duration) = ping.unwrap(); |
34 | debug!("Ping took {:?}", duration); | 33 | debug!("Ping took {:?}", duration); |
35 | cont = false; | 34 | cont = false; |
36 | // FIXME: remove unwrap | 35 | handle_broadcast_send(&tx, ip.clone(), ping_map.clone(), uuid.clone()).await; |
37 | // FIXME: if error: SendError because no listener, then handle the entry directly | ||
38 | tx.send(ip.clone()); | ||
39 | }; | 36 | }; |
40 | } | 37 | } |
41 | 38 | ||
42 | Ok(()) | 39 | Ok(()) |
43 | } | 40 | } |
44 | 41 | ||
45 | // FIXME: Handle commands through enum | 42 | async fn handle_broadcast_send(tx: &Sender<BroadcastCommands>, ip: String, ping_map: PingMap, uuid: String) { |
46 | pub async fn status_websocket(mut socket: WebSocket, tx: Sender<String>, ping_map: Arc<Mutex<HashMap<String, (String, bool)>>>) { | 43 | debug!("sending pingsuccess message"); |
47 | warn!("{:?}", ping_map); | 44 | ping_map.lock().await.insert(uuid.clone(), (ip.clone(), true)); |
45 | let _ = tx.send(BroadcastCommands::PingSuccess(ip)); | ||
46 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; | ||
47 | trace!("remove {} from ping_map", uuid); | ||
48 | ping_map.lock().await.remove(&uuid); | ||
49 | } | ||
48 | 50 | ||
49 | let mut uuid: Option<String> = None; | 51 | #[derive(Clone, Debug)] |
52 | pub enum BroadcastCommands { | ||
53 | PingSuccess(String) | ||
54 | } | ||
55 | |||
56 | pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommands>, ping_map: PingMap) { | ||
57 | warn!("{:?}", ping_map); | ||
50 | 58 | ||
51 | trace!("wait for ws message (uuid)"); | 59 | trace!("wait for ws message (uuid)"); |
52 | let msg = socket.recv().await; | 60 | let msg = socket.recv().await; |
53 | uuid = Some(msg.unwrap().unwrap().into_text().unwrap()); | 61 | let uuid = msg.unwrap().unwrap().into_text().unwrap(); |
54 | |||
55 | let uuid = uuid.unwrap(); | ||
56 | 62 | ||
57 | trace!("Search for uuid: {:?}", uuid); | 63 | trace!("Search for uuid: {:?}", uuid); |
58 | 64 | ||
65 | // TODO: Handle Error | ||
59 | let device = ping_map.lock().await.get(&uuid).unwrap().to_owned(); | 66 | let device = ping_map.lock().await.get(&uuid).unwrap().to_owned(); |
60 | 67 | ||
61 | trace!("got device: {:?}", device); | 68 | trace!("got device: {:?}", device); |
@@ -63,29 +70,32 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender<String>, ping_ma | |||
63 | match device.1 { | 70 | match device.1 { |
64 | true => { | 71 | true => { |
65 | debug!("already started"); | 72 | debug!("already started"); |
66 | socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); | 73 | // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); |
67 | socket.close().await.unwrap(); | 74 | // socket.close().await.unwrap(); |
75 | socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); | ||
68 | }, | 76 | }, |
69 | false => { | 77 | false => { |
70 | let ip = device.0.to_owned(); | 78 | let ip = device.0.to_owned(); |
71 | let mut i = 0; | ||
72 | loop{ | 79 | loop{ |
73 | trace!("{}", i); | ||
74 | // TODO: Check if older than 10 minutes, close if true | ||
75 | trace!("wait for tx message"); | 80 | trace!("wait for tx message"); |
76 | let message = tx.subscribe().recv().await.unwrap(); | 81 | let message = tx.subscribe().recv().await.unwrap(); |
77 | trace!("GOT = {}", message); | 82 | trace!("GOT = {:?}", message); |
78 | if message == ip { | 83 | // if let BroadcastCommands::PingSuccess(msg_ip) = message { |
84 | // if msg_ip == ip { | ||
85 | // trace!("message == ip"); | ||
86 | // break; | ||
87 | // } | ||
88 | // } | ||
89 | let BroadcastCommands::PingSuccess(msg_ip) = message; | ||
90 | if msg_ip == ip { | ||
79 | trace!("message == ip"); | 91 | trace!("message == ip"); |
80 | break; | 92 | break; |
81 | } | 93 | } |
82 | i += 1; | ||
83 | }; | 94 | }; |
84 | 95 | ||
85 | socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); | 96 | socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); |
86 | socket.close().await.unwrap(); | 97 | // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); |
87 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; | 98 | // socket.close().await.unwrap(); |
88 | ping_map.lock().await.remove(&uuid); | ||
89 | warn!("{:?}", ping_map); | 99 | warn!("{:?}", ping_map); |
90 | } | 100 | } |
91 | } | 101 | } |