summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFxQnLr <[email protected]>2023-10-29 19:55:26 +0100
committerFxQnLr <[email protected]>2023-10-29 19:55:26 +0100
commit0cca10290d089aabac8f2e4356cfaf80f06ae194 (patch)
tree708d44f2c439bb23b664114e16d92af63c693f3b
parent00dd8a9abee6b9f0cfc37c6f20f30f0d99dfe91a (diff)
downloadwebol-0cca10290d089aabac8f2e4356cfaf80f06ae194.tar
webol-0cca10290d089aabac8f2e4356cfaf80f06ae194.tar.gz
webol-0cca10290d089aabac8f2e4356cfaf80f06ae194.zip
does what is expected, but badly
-rw-r--r--src/error.rs20
-rw-r--r--src/main.rs5
-rw-r--r--src/routes/start.rs5
-rw-r--r--src/routes/status.rs2
-rw-r--r--src/services/ping.rs68
5 files changed, 60 insertions, 40 deletions
diff --git a/src/error.rs b/src/error.rs
index f143ee9..1592a78 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -10,19 +10,20 @@ use crate::auth::AuthError;
10#[derive(Debug)] 10#[derive(Debug)]
11pub enum WebolError { 11pub enum WebolError {
12 Generic, 12 Generic,
13 // User(UserError),
13 Auth(AuthError), 14 Auth(AuthError),
14 Ping(surge_ping::SurgeError), 15 Ping(surge_ping::SurgeError),
15 DB(sqlx::Error), 16 DB(sqlx::Error),
16 IpParse(<std::net::IpAddr as std::str::FromStr>::Err), 17 IpParse(<std::net::IpAddr as std::str::FromStr>::Err),
17 BufferParse(std::num::ParseIntError), 18 BufferParse(std::num::ParseIntError),
18 Broadcast(io::Error), 19 Broadcast(io::Error),
19 Axum(axum::Error)
20} 20}
21 21
22impl IntoResponse for WebolError { 22impl IntoResponse for WebolError {
23 fn into_response(self) -> Response { 23 fn into_response(self) -> Response {
24 let (status, error_message) = match self { 24 let (status, error_message) = match self {
25 Self::Auth(err) => err.get(), 25 Self::Auth(err) => err.get(),
26 // Self::User(err) => err.get(),
26 Self::Generic => (StatusCode::INTERNAL_SERVER_ERROR, ""), 27 Self::Generic => (StatusCode::INTERNAL_SERVER_ERROR, ""),
27 Self::Ping(err) => { 28 Self::Ping(err) => {
28 error!("Ping: {}", err.source().unwrap()); 29 error!("Ping: {}", err.source().unwrap());
@@ -44,10 +45,6 @@ impl IntoResponse for WebolError {
44 error!("server error: {}", err.to_string()); 45 error!("server error: {}", err.to_string());
45 (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") 46 (StatusCode::INTERNAL_SERVER_ERROR, "Server Error")
46 }, 47 },
47 Self::Axum(err) => {
48 error!("server error: {}", err.to_string());
49 (StatusCode::INTERNAL_SERVER_ERROR, "Server Error")
50 },
51 }; 48 };
52 let body = Json(json!({ 49 let body = Json(json!({
53 "error": error_message, 50 "error": error_message,
@@ -55,3 +52,16 @@ impl IntoResponse for WebolError {
55 (status, body).into_response() 52 (status, body).into_response()
56 } 53 }
57} 54}
55
56// #[derive(Debug)]
57// pub enum UserError {
58// UnknownUUID,
59// }
60//
61// impl UserError {
62// pub fn get(self) -> (StatusCode, &'static str) {
63// match self {
64// Self::UnknownUUID => (StatusCode::UNPROCESSABLE_ENTITY, "Unknown UUID"),
65// }
66// }
67// }
diff --git a/src/main.rs b/src/main.rs
index 854b59d..545d8fe 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -14,6 +14,7 @@ use crate::db::init_db_pool;
14use crate::routes::device::{get_device, post_device, put_device}; 14use crate::routes::device::{get_device, post_device, put_device};
15use crate::routes::start::start; 15use crate::routes::start::start;
16use crate::routes::status::status; 16use crate::routes::status::status;
17use crate::services::ping::{BroadcastCommands, PingMap};
17 18
18mod auth; 19mod auth;
19mod config; 20mod config;
@@ -72,6 +73,6 @@ async fn main() {
72 73
73pub struct AppState { 74pub struct AppState {
74 db: PgPool, 75 db: PgPool,
75 ping_send: Sender<String>, 76 ping_send: Sender<BroadcastCommands>,
76 ping_map: Arc<Mutex<HashMap<String, (String, bool)>>>, 77 ping_map: PingMap,
77} \ No newline at end of file 78} \ No newline at end of file
diff --git a/src/routes/start.rs b/src/routes/start.rs
index 45e7ec8..b1c8a73 100644
--- a/src/routes/start.rs
+++ b/src/routes/start.rs
@@ -44,15 +44,16 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap
44 let uuid = if payload.ping.is_some_and(|ping| ping) { 44 let uuid = if payload.ping.is_some_and(|ping| ping) {
45 let uuid_gen = Uuid::new_v4().to_string(); 45 let uuid_gen = Uuid::new_v4().to_string();
46 let uuid_genc = uuid_gen.clone(); 46 let uuid_genc = uuid_gen.clone();
47 let uuid_gencc = uuid_gen.clone();
47 tokio::spawn(async move{ 48 tokio::spawn(async move{
48 debug!("Init ping service"); 49 debug!("Init ping service");
49 state.ping_map.lock().await.insert(uuid_gen, ("192.168.178.94".to_string(), false)); 50 state.ping_map.lock().await.insert(uuid_gen, ("192.168.178.94".to_string(), false));
50 51
51 warn!("{:?}", state.ping_map); 52 warn!("{:?}", state.ping_map);
52 53
53 crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string()).await; 54 crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string(), uuid_genc.clone(), state.ping_map.clone()).await
54 }); 55 });
55 Some(uuid_genc) 56 Some(uuid_gencc)
56 } else { None }; 57 } else { None };
57 Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) 58 Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid })))
58 } else { 59 } else {
diff --git a/src/routes/status.rs b/src/routes/status.rs
index cdecf6a..4a5ec67 100644
--- a/src/routes/status.rs
+++ b/src/routes/status.rs
@@ -1,12 +1,10 @@
1use std::sync::Arc; 1use std::sync::Arc;
2use axum::extract::{State, WebSocketUpgrade}; 2use axum::extract::{State, WebSocketUpgrade};
3use axum::response::Response; 3use axum::response::Response;
4use serde::Deserialize;
5use crate::AppState; 4use crate::AppState;
6use crate::services::ping::status_websocket; 5use crate::services::ping::status_websocket;
7 6
8#[axum_macros::debug_handler] 7#[axum_macros::debug_handler]
9pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response { 8pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response {
10 // TODO: remove unwrap
11 ws.on_upgrade(move |socket| status_websocket(socket, state.ping_send.clone(), state.ping_map.clone())) 9 ws.on_upgrade(move |socket| status_websocket(socket, state.ping_send.clone(), state.ping_map.clone()))
12} \ No newline at end of file 10} \ No newline at end of file
diff --git a/src/services/ping.rs b/src/services/ping.rs
index e3d465d..6835fc0 100644
--- a/src/services/ping.rs
+++ b/src/services/ping.rs
@@ -3,16 +3,19 @@ use std::collections::HashMap;
3use std::sync::Arc; 3use std::sync::Arc;
4 4
5use axum::extract::{ws::WebSocket}; 5use axum::extract::{ws::WebSocket};
6use axum::extract::ws::Message; 6use axum::extract::ws::{CloseFrame, Message};
7use tokio::sync::broadcast::{Sender}; 7use tokio::sync::broadcast::{Sender};
8use tokio::sync::Mutex; 8use tokio::sync::Mutex;
9use tracing::{debug, error, trace, warn}; 9use tracing::{debug, trace, warn};
10 10
11use crate::error::WebolError; 11use crate::error::WebolError;
12 12
13pub async fn spawn(tx: Sender<String>, ip: String) -> Result<(), WebolError> { 13pub type PingMap = Arc<Mutex<HashMap<String, (String, bool)>>>;
14
15pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: PingMap) -> Result<(), WebolError> {
14 let payload = [0; 8]; 16 let payload = [0; 8];
15 17
18 // TODO: Better while
16 let mut cont = true; 19 let mut cont = true;
17 while cont { 20 while cont {
18 let ping = surge_ping::ping( 21 let ping = surge_ping::ping(
@@ -22,40 +25,44 @@ pub async fn spawn(tx: Sender<String>, ip: String) -> Result<(), WebolError> {
22 25
23 if let Err(ping) = ping { 26 if let Err(ping) = ping {
24 cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); 27 cont = matches!(ping, surge_ping::SurgeError::Timeout { .. });
25
26 // debug!("{}", cont);
27
28 if !cont { 28 if !cont {
29 return Err(ping).map_err(WebolError::Ping) 29 return Err(ping).map_err(WebolError::Ping)
30 } 30 }
31
32 } else { 31 } else {
33 let (_, duration) = ping.unwrap(); 32 let (_, duration) = ping.unwrap();
34 debug!("Ping took {:?}", duration); 33 debug!("Ping took {:?}", duration);
35 cont = false; 34 cont = false;
36 // FIXME: remove unwrap 35 handle_broadcast_send(&tx, ip.clone(), ping_map.clone(), uuid.clone()).await;
37 // FIXME: if error: SendError because no listener, then handle the entry directly
38 tx.send(ip.clone());
39 }; 36 };
40 } 37 }
41 38
42 Ok(()) 39 Ok(())
43} 40}
44 41
45// FIXME: Handle commands through enum 42async fn handle_broadcast_send(tx: &Sender<BroadcastCommands>, ip: String, ping_map: PingMap, uuid: String) {
46pub async fn status_websocket(mut socket: WebSocket, tx: Sender<String>, ping_map: Arc<Mutex<HashMap<String, (String, bool)>>>) { 43 debug!("sending pingsuccess message");
47 warn!("{:?}", ping_map); 44 ping_map.lock().await.insert(uuid.clone(), (ip.clone(), true));
45 let _ = tx.send(BroadcastCommands::PingSuccess(ip));
46 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
47 trace!("remove {} from ping_map", uuid);
48 ping_map.lock().await.remove(&uuid);
49}
48 50
49 let mut uuid: Option<String> = None; 51#[derive(Clone, Debug)]
52pub enum BroadcastCommands {
53 PingSuccess(String)
54}
55
56pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommands>, ping_map: PingMap) {
57 warn!("{:?}", ping_map);
50 58
51 trace!("wait for ws message (uuid)"); 59 trace!("wait for ws message (uuid)");
52 let msg = socket.recv().await; 60 let msg = socket.recv().await;
53 uuid = Some(msg.unwrap().unwrap().into_text().unwrap()); 61 let uuid = msg.unwrap().unwrap().into_text().unwrap();
54
55 let uuid = uuid.unwrap();
56 62
57 trace!("Search for uuid: {:?}", uuid); 63 trace!("Search for uuid: {:?}", uuid);
58 64
65 // TODO: Handle Error
59 let device = ping_map.lock().await.get(&uuid).unwrap().to_owned(); 66 let device = ping_map.lock().await.get(&uuid).unwrap().to_owned();
60 67
61 trace!("got device: {:?}", device); 68 trace!("got device: {:?}", device);
@@ -63,29 +70,32 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender<String>, ping_ma
63 match device.1 { 70 match device.1 {
64 true => { 71 true => {
65 debug!("already started"); 72 debug!("already started");
66 socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); 73 // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap();
67 socket.close().await.unwrap(); 74 // socket.close().await.unwrap();
75 socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap();
68 }, 76 },
69 false => { 77 false => {
70 let ip = device.0.to_owned(); 78 let ip = device.0.to_owned();
71 let mut i = 0;
72 loop{ 79 loop{
73 trace!("{}", i);
74 // TODO: Check if older than 10 minutes, close if true
75 trace!("wait for tx message"); 80 trace!("wait for tx message");
76 let message = tx.subscribe().recv().await.unwrap(); 81 let message = tx.subscribe().recv().await.unwrap();
77 trace!("GOT = {}", message); 82 trace!("GOT = {:?}", message);
78 if message == ip { 83 // if let BroadcastCommands::PingSuccess(msg_ip) = message {
84 // if msg_ip == ip {
85 // trace!("message == ip");
86 // break;
87 // }
88 // }
89 let BroadcastCommands::PingSuccess(msg_ip) = message;
90 if msg_ip == ip {
79 trace!("message == ip"); 91 trace!("message == ip");
80 break; 92 break;
81 } 93 }
82 i += 1;
83 }; 94 };
84 95
85 socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); 96 socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap();
86 socket.close().await.unwrap(); 97 // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap();
87 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; 98 // socket.close().await.unwrap();
88 ping_map.lock().await.remove(&uuid);
89 warn!("{:?}", ping_map); 99 warn!("{:?}", ping_map);
90 } 100 }
91 } 101 }