aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFxQnLr <[email protected]>2023-10-29 21:09:46 +0100
committerFxQnLr <[email protected]>2023-10-29 21:09:46 +0100
commitdd303dc41e4d500e48760f79351720cc2c1c9ffe (patch)
treeb730a1da1e0e58e12494bbab88076d403f4ccd97 /src
parent84c32953ae5f52be44af4b48381747f55cb04f4a (diff)
downloadwebol-dd303dc41e4d500e48760f79351720cc2c1c9ffe.tar
webol-dd303dc41e4d500e48760f79351720cc2c1c9ffe.tar.gz
webol-dd303dc41e4d500e48760f79351720cc2c1c9ffe.zip
add ip to database and use for ping, remove arc from pingmap
Diffstat (limited to 'src')
-rw-r--r--src/db.rs3
-rw-r--r--src/main.rs2
-rw-r--r--src/routes/device.rs16
-rw-r--r--src/routes/start.rs11
-rw-r--r--src/routes/status.rs2
-rw-r--r--src/services/ping.rs20
6 files changed, 30 insertions, 24 deletions
diff --git a/src/db.rs b/src/db.rs
index 3c51e2b..51ea469 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -12,7 +12,8 @@ use crate::config::SETTINGS;
12pub struct Device { 12pub struct Device {
13 pub id: String, 13 pub id: String,
14 pub mac: String, 14 pub mac: String,
15 pub broadcast_addr: String 15 pub broadcast_addr: String,
16 pub ip: String
16} 17}
17 18
18pub async fn init_db_pool() -> PgPool { 19pub async fn init_db_pool() -> PgPool {
diff --git a/src/main.rs b/src/main.rs
index 762a817..ee540af 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -52,7 +52,7 @@ async fn main() {
52 52
53 let ping_map: DashMap<String, (String, bool)> = DashMap::new(); 53 let ping_map: DashMap<String, (String, bool)> = DashMap::new();
54 54
55 let shared_state = Arc::new(AppState { db, ping_send: tx, ping_map: Arc::new(ping_map) }); 55 let shared_state = Arc::new(AppState { db, ping_send: tx, ping_map });
56 56
57 let app = Router::new() 57 let app = Router::new()
58 .route("/start", post(start)) 58 .route("/start", post(start))
diff --git a/src/routes/device.rs b/src/routes/device.rs
index 248d1e0..7353733 100644
--- a/src/routes/device.rs
+++ b/src/routes/device.rs
@@ -16,7 +16,7 @@ pub async fn get_device(State(state): State<Arc<crate::AppState>>, headers: Head
16 let device = sqlx::query_as!( 16 let device = sqlx::query_as!(
17 Device, 17 Device,
18 r#" 18 r#"
19 SELECT id, mac, broadcast_addr 19 SELECT id, mac, broadcast_addr, ip
20 FROM devices 20 FROM devices
21 WHERE id = $1; 21 WHERE id = $1;
22 "#, 22 "#,
@@ -40,12 +40,13 @@ pub async fn put_device(State(state): State<Arc<crate::AppState>>, headers: Head
40 if auth(secret).map_err(WebolError::Auth)? { 40 if auth(secret).map_err(WebolError::Auth)? {
41 sqlx::query!( 41 sqlx::query!(
42 r#" 42 r#"
43 INSERT INTO devices (id, mac, broadcast_addr) 43 INSERT INTO devices (id, mac, broadcast_addr, ip)
44 VALUES ($1, $2, $3); 44 VALUES ($1, $2, $3, $4);
45 "#, 45 "#,
46 payload.id, 46 payload.id,
47 payload.mac, 47 payload.mac,
48 payload.broadcast_addr 48 payload.broadcast_addr,
49 payload.ip
49 ).execute(&state.db).await.map_err(WebolError::DB)?; 50 ).execute(&state.db).await.map_err(WebolError::DB)?;
50 51
51 Ok(Json(json!(PutDeviceResponse { success: true }))) 52 Ok(Json(json!(PutDeviceResponse { success: true })))
@@ -59,6 +60,7 @@ pub struct PutDevicePayload {
59 id: String, 60 id: String,
60 mac: String, 61 mac: String,
61 broadcast_addr: String, 62 broadcast_addr: String,
63 ip: String
62} 64}
63 65
64#[derive(Serialize)] 66#[derive(Serialize)]
@@ -74,11 +76,12 @@ pub async fn post_device(State(state): State<Arc<crate::AppState>>, headers: Hea
74 Device, 76 Device,
75 r#" 77 r#"
76 UPDATE devices 78 UPDATE devices
77 SET mac = $1, broadcast_addr = $2 WHERE id = $3 79 SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4
78 RETURNING id, mac, broadcast_addr; 80 RETURNING id, mac, broadcast_addr, ip;
79 "#, 81 "#,
80 payload.mac, 82 payload.mac,
81 payload.broadcast_addr, 83 payload.broadcast_addr,
84 payload.ip,
82 payload.id 85 payload.id
83 ).fetch_one(&state.db).await.map_err(WebolError::DB)?; 86 ).fetch_one(&state.db).await.map_err(WebolError::DB)?;
84 87
@@ -93,4 +96,5 @@ pub struct PostDevicePayload {
93 id: String, 96 id: String,
94 mac: String, 97 mac: String,
95 broadcast_addr: String, 98 broadcast_addr: String,
99 ip: String,
96} 100}
diff --git a/src/routes/start.rs b/src/routes/start.rs
index 5b73281..3bccb0f 100644
--- a/src/routes/start.rs
+++ b/src/routes/start.rs
@@ -22,7 +22,7 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap
22 let device = sqlx::query_as!( 22 let device = sqlx::query_as!(
23 Device, 23 Device,
24 r#" 24 r#"
25 SELECT id, mac, broadcast_addr 25 SELECT id, mac, broadcast_addr, ip
26 FROM devices 26 FROM devices
27 WHERE id = $1; 27 WHERE id = $1;
28 "#, 28 "#,
@@ -44,16 +44,15 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap
44 let uuid = if payload.ping.is_some_and(|ping| ping) { 44 let uuid = if payload.ping.is_some_and(|ping| ping) {
45 let uuid_gen = Uuid::new_v4().to_string(); 45 let uuid_gen = Uuid::new_v4().to_string();
46 let uuid_genc = uuid_gen.clone(); 46 let uuid_genc = uuid_gen.clone();
47 let uuid_gencc = uuid_gen.clone(); 47 tokio::spawn(async move {
48 tokio::spawn(async move{
49 debug!("Init ping service"); 48 debug!("Init ping service");
50 state.ping_map.insert(uuid_gen, ("192.168.178.94".to_string(), false)); 49 state.ping_map.insert(uuid_gen.clone(), (device.ip.clone(), false));
51 50
52 warn!("{:?}", state.ping_map); 51 warn!("{:?}", state.ping_map);
53 52
54 crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string(), uuid_genc.clone(), state.ping_map.clone()).await 53 crate::services::ping::spawn(state.ping_send.clone(), device.ip, uuid_gen.clone(), &state.ping_map).await
55 }); 54 });
56 Some(uuid_gencc) 55 Some(uuid_genc)
57 } else { None }; 56 } else { None };
58 Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) 57 Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid })))
59 } else { 58 } else {
diff --git a/src/routes/status.rs b/src/routes/status.rs
index 4a5ec67..45f3e51 100644
--- a/src/routes/status.rs
+++ b/src/routes/status.rs
@@ -6,5 +6,5 @@ use crate::services::ping::status_websocket;
6 6
7#[axum_macros::debug_handler] 7#[axum_macros::debug_handler]
8pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response { 8pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response {
9 ws.on_upgrade(move |socket| status_websocket(socket, state.ping_send.clone(), state.ping_map.clone())) 9 ws.on_upgrade(move |socket| status_websocket(socket, state))
10} \ No newline at end of file 10} \ No newline at end of file
diff --git a/src/services/ping.rs b/src/services/ping.rs
index ed848fc..04ad511 100644
--- a/src/services/ping.rs
+++ b/src/services/ping.rs
@@ -6,12 +6,13 @@ use axum::extract::ws::{CloseFrame, Message};
6use dashmap::DashMap; 6use dashmap::DashMap;
7use tokio::sync::broadcast::{Sender}; 7use tokio::sync::broadcast::{Sender};
8use tracing::{debug, trace, warn}; 8use tracing::{debug, trace, warn};
9use crate::AppState;
9 10
10use crate::error::WebolError; 11use crate::error::WebolError;
11 12
12pub type PingMap = Arc<DashMap<String, (String, bool)>>; 13pub type PingMap = DashMap<String, (String, bool)>;
13 14
14pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: PingMap) -> Result<(), WebolError> { 15pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: &PingMap) -> Result<(), WebolError> {
15 let payload = [0; 8]; 16 let payload = [0; 8];
16 17
17 // TODO: Better while 18 // TODO: Better while
@@ -31,14 +32,14 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping
31 let (_, duration) = ping.unwrap(); 32 let (_, duration) = ping.unwrap();
32 debug!("Ping took {:?}", duration); 33 debug!("Ping took {:?}", duration);
33 cont = false; 34 cont = false;
34 handle_broadcast_send(&tx, ip.clone(), ping_map.clone(), uuid.clone()).await; 35 handle_broadcast_send(&tx, ip.clone(), &ping_map, uuid.clone()).await;
35 }; 36 };
36 } 37 }
37 38
38 Ok(()) 39 Ok(())
39} 40}
40 41
41async fn handle_broadcast_send(tx: &Sender<BroadcastCommands>, ip: String, ping_map: PingMap, uuid: String) { 42async fn handle_broadcast_send(tx: &Sender<BroadcastCommands>, ip: String, ping_map: &PingMap, uuid: String) {
42 debug!("sending pingsuccess message"); 43 debug!("sending pingsuccess message");
43 ping_map.insert(uuid.clone(), (ip.clone(), true)); 44 ping_map.insert(uuid.clone(), (ip.clone(), true));
44 let _ = tx.send(BroadcastCommands::PingSuccess(ip)); 45 let _ = tx.send(BroadcastCommands::PingSuccess(ip));
@@ -52,8 +53,8 @@ pub enum BroadcastCommands {
52 PingSuccess(String) 53 PingSuccess(String)
53} 54}
54 55
55pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommands>, ping_map: PingMap) { 56pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) {
56 warn!("{:?}", ping_map); 57 warn!("{:?}", state.ping_map);
57 58
58 trace!("wait for ws message (uuid)"); 59 trace!("wait for ws message (uuid)");
59 let msg = socket.recv().await; 60 let msg = socket.recv().await;
@@ -62,13 +63,14 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommand
62 trace!("Search for uuid: {:?}", uuid); 63 trace!("Search for uuid: {:?}", uuid);
63 64
64 // TODO: Handle Error 65 // TODO: Handle Error
65 let device = ping_map.get(&uuid).unwrap().to_owned(); 66 let device = state.ping_map.get(&uuid).unwrap().to_owned();
66 67
67 trace!("got device: {:?}", device); 68 trace!("got device: {:?}", device);
68 69
69 match device.1 { 70 match device.1 {
70 true => { 71 true => {
71 debug!("already started"); 72 debug!("already started");
73 // TODO: What's better?
72 // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); 74 // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap();
73 // socket.close().await.unwrap(); 75 // socket.close().await.unwrap();
74 socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); 76 socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap();
@@ -77,7 +79,7 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommand
77 let ip = device.0.to_owned(); 79 let ip = device.0.to_owned();
78 loop{ 80 loop{
79 trace!("wait for tx message"); 81 trace!("wait for tx message");
80 let message = tx.subscribe().recv().await.unwrap(); 82 let message = state.ping_send.subscribe().recv().await.unwrap();
81 trace!("GOT = {:?}", message); 83 trace!("GOT = {:?}", message);
82 // if let BroadcastCommands::PingSuccess(msg_ip) = message { 84 // if let BroadcastCommands::PingSuccess(msg_ip) = message {
83 // if msg_ip == ip { 85 // if msg_ip == ip {
@@ -95,7 +97,7 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommand
95 socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); 97 socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap();
96 // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); 98 // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap();
97 // socket.close().await.unwrap(); 99 // socket.close().await.unwrap();
98 warn!("{:?}", ping_map); 100 warn!("{:?}", state.ping_map);
99 } 101 }
100 } 102 }
101} \ No newline at end of file 103} \ No newline at end of file