aboutsummaryrefslogtreecommitdiff
path: root/src/routes
diff options
context:
space:
mode:
authorFxQnLr <[email protected]>2024-02-18 21:16:46 +0100
committerFxQnLr <[email protected]>2024-02-18 21:16:46 +0100
commit2f9f18b80a9e2134f674f345e48a5f21de5efadd (patch)
treec4202bb5c1a490233e89d928cf8c5b91258d4c90 /src/routes
parent016fa3a31f8847d3f52800941b7f8fe5ef872240 (diff)
downloadwebol-2f9f18b80a9e2134f674f345e48a5f21de5efadd.tar
webol-2f9f18b80a9e2134f674f345e48a5f21de5efadd.tar.gz
webol-2f9f18b80a9e2134f674f345e48a5f21de5efadd.zip
Refactor stuff. Use Postgres Types
Diffstat (limited to 'src/routes')
-rw-r--r--src/routes/device.rs19
-rw-r--r--src/routes/mod.rs3
-rw-r--r--src/routes/start.rs78
-rw-r--r--src/routes/status.rs79
4 files changed, 128 insertions, 51 deletions
diff --git a/src/routes/device.rs b/src/routes/device.rs
index 5ca574a..2f0093d 100644
--- a/src/routes/device.rs
+++ b/src/routes/device.rs
@@ -4,9 +4,11 @@ use crate::error::Error;
4use axum::extract::State; 4use axum::extract::State;
5use axum::http::HeaderMap; 5use axum::http::HeaderMap;
6use axum::Json; 6use axum::Json;
7use mac_address::MacAddress;
7use serde::{Deserialize, Serialize}; 8use serde::{Deserialize, Serialize};
8use serde_json::{json, Value}; 9use serde_json::{json, Value};
9use std::sync::Arc; 10use sqlx::types::ipnetwork::IpNetwork;
11use std::{sync::Arc, str::FromStr};
10use tracing::{debug, info}; 12use tracing::{debug, info};
11 13
12pub async fn get( 14pub async fn get(
@@ -14,7 +16,7 @@ pub async fn get(
14 headers: HeaderMap, 16 headers: HeaderMap,
15 Json(payload): Json<GetDevicePayload>, 17 Json(payload): Json<GetDevicePayload>,
16) -> Result<Json<Value>, Error> { 18) -> Result<Json<Value>, Error> {
17 info!("add device {}", payload.id); 19 info!("get device {}", payload.id);
18 let secret = headers.get("authorization"); 20 let secret = headers.get("authorization");
19 let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success); 21 let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success);
20 if authorized { 22 if authorized {
@@ -52,18 +54,21 @@ pub async fn put(
52 "add device {} ({}, {}, {})", 54 "add device {} ({}, {}, {})",
53 payload.id, payload.mac, payload.broadcast_addr, payload.ip 55 payload.id, payload.mac, payload.broadcast_addr, payload.ip
54 ); 56 );
57
55 let secret = headers.get("authorization"); 58 let secret = headers.get("authorization");
56 let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success); 59 let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success);
57 if authorized { 60 if authorized {
61 let ip = IpNetwork::from_str(&payload.ip)?;
62 let mac = MacAddress::from_str(&payload.mac)?;
58 sqlx::query!( 63 sqlx::query!(
59 r#" 64 r#"
60 INSERT INTO devices (id, mac, broadcast_addr, ip) 65 INSERT INTO devices (id, mac, broadcast_addr, ip)
61 VALUES ($1, $2, $3, $4); 66 VALUES ($1, $2, $3, $4);
62 "#, 67 "#,
63 payload.id, 68 payload.id,
64 payload.mac, 69 mac,
65 payload.broadcast_addr, 70 payload.broadcast_addr,
66 payload.ip 71 ip
67 ) 72 )
68 .execute(&state.db) 73 .execute(&state.db)
69 .await?; 74 .await?;
@@ -99,6 +104,8 @@ pub async fn post(
99 let secret = headers.get("authorization"); 104 let secret = headers.get("authorization");
100 let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success); 105 let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success);
101 if authorized { 106 if authorized {
107 let ip = IpNetwork::from_str(&payload.ip)?;
108 let mac = MacAddress::from_str(&payload.mac)?;
102 let device = sqlx::query_as!( 109 let device = sqlx::query_as!(
103 Device, 110 Device,
104 r#" 111 r#"
@@ -106,9 +113,9 @@ pub async fn post(
106 SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4 113 SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4
107 RETURNING id, mac, broadcast_addr, ip, times; 114 RETURNING id, mac, broadcast_addr, ip, times;
108 "#, 115 "#,
109 payload.mac, 116 mac,
110 payload.broadcast_addr, 117 payload.broadcast_addr,
111 payload.ip, 118 ip,
112 payload.id 119 payload.id
113 ) 120 )
114 .fetch_one(&state.db) 121 .fetch_one(&state.db)
diff --git a/src/routes/mod.rs b/src/routes/mod.rs
deleted file mode 100644
index d5ab0d6..0000000
--- a/src/routes/mod.rs
+++ /dev/null
@@ -1,3 +0,0 @@
1pub mod start;
2pub mod device;
3pub mod status; \ No newline at end of file
diff --git a/src/routes/start.rs b/src/routes/start.rs
index ec4f98f..4888325 100644
--- a/src/routes/start.rs
+++ b/src/routes/start.rs
@@ -12,7 +12,6 @@ use std::sync::Arc;
12use tracing::{debug, info}; 12use tracing::{debug, info};
13use uuid::Uuid; 13use uuid::Uuid;
14 14
15#[axum_macros::debug_handler]
16pub async fn start( 15pub async fn start(
17 State(state): State<Arc<crate::AppState>>, 16 State(state): State<Arc<crate::AppState>>,
18 headers: HeaderMap, 17 headers: HeaderMap,
@@ -41,45 +40,11 @@ pub async fn start(
41 let _ = send_packet( 40 let _ = send_packet(
42 bind_addr, 41 bind_addr,
43 &device.broadcast_addr, 42 &device.broadcast_addr,
44 &create_buffer(&device.mac)?, 43 &create_buffer(&device.mac.to_string())?,
45 )?; 44 )?;
46 let dev_id = device.id.clone(); 45 let dev_id = device.id.clone();
47 let uuid = if payload.ping.is_some_and(|ping| ping) { 46 let uuid = if payload.ping.is_some_and(|ping| ping) {
48 let mut uuid: Option<String> = None; 47 Some(setup_ping(state, device))
49 for (key, value) in state.ping_map.clone() {
50 if value.ip == device.ip {
51 debug!("service already exists");
52 uuid = Some(key);
53 break;
54 }
55 }
56 let uuid_gen = match uuid {
57 Some(u) => u,
58 None => Uuid::new_v4().to_string(),
59 };
60 let uuid_genc = uuid_gen.clone();
61
62 tokio::spawn(async move {
63 debug!("init ping service");
64 state.ping_map.insert(
65 uuid_gen.clone(),
66 PingValue {
67 ip: device.ip.clone(),
68 online: false,
69 },
70 );
71
72 crate::services::ping::spawn(
73 state.ping_send.clone(),
74 &state.config,
75 device,
76 uuid_gen.clone(),
77 &state.ping_map,
78 &state.db,
79 )
80 .await;
81 });
82 Some(uuid_genc)
83 } else { 48 } else {
84 None 49 None
85 }; 50 };
@@ -93,6 +58,45 @@ pub async fn start(
93 } 58 }
94} 59}
95 60
61fn setup_ping(state: Arc<crate::AppState>, device: Device) -> String {
62 let mut uuid: Option<String> = None;
63 for (key, value) in state.ping_map.clone() {
64 if value.ip == device.ip {
65 debug!("service already exists");
66 uuid = Some(key);
67 break;
68 }
69 }
70 let uuid_gen = match uuid {
71 Some(u) => u,
72 None => Uuid::new_v4().to_string(),
73 };
74 let uuid_ret = uuid_gen.clone();
75
76 debug!("init ping service");
77 state.ping_map.insert(
78 uuid_gen.clone(),
79 PingValue {
80 ip: device.ip,
81 online: false,
82 },
83 );
84
85 tokio::spawn(async move {
86 crate::services::ping::spawn(
87 state.ping_send.clone(),
88 &state.config,
89 device,
90 uuid_gen,
91 &state.ping_map,
92 &state.db,
93 )
94 .await;
95 });
96
97 uuid_ret
98}
99
96#[derive(Deserialize)] 100#[derive(Deserialize)]
97pub struct Payload { 101pub struct Payload {
98 id: String, 102 id: String,
diff --git a/src/routes/status.rs b/src/routes/status.rs
index 31ef996..0e25f7d 100644
--- a/src/routes/status.rs
+++ b/src/routes/status.rs
@@ -1,10 +1,79 @@
1use std::sync::Arc; 1use crate::services::ping::BroadcastCommand;
2use crate::AppState;
3use axum::extract::ws::{Message, WebSocket};
2use axum::extract::{State, WebSocketUpgrade}; 4use axum::extract::{State, WebSocketUpgrade};
3use axum::response::Response; 5use axum::response::Response;
4use crate::AppState; 6use sqlx::PgPool;
5use crate::services::ping::status_websocket; 7use std::sync::Arc;
8use tracing::{debug, trace};
6 9
7#[axum_macros::debug_handler]
8pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response { 10pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response {
9 ws.on_upgrade(move |socket| status_websocket(socket, state)) 11 ws.on_upgrade(move |socket| websocket(socket, state))
12}
13
14pub async fn websocket(mut socket: WebSocket, state: Arc<AppState>) {
15 trace!("wait for ws message (uuid)");
16 let msg = socket.recv().await;
17 let uuid = msg.unwrap().unwrap().into_text().unwrap();
18
19 trace!("Search for uuid: {}", uuid);
20
21 let eta = get_eta(&state.db).await;
22 let _ = socket
23 .send(Message::Text(format!("eta_{eta}_{uuid}")))
24 .await;
25
26 let device_exists = state.ping_map.contains_key(&uuid);
27 if device_exists {
28 let _ = socket
29 .send(receive_ping_broadcast(state.clone(), uuid).await)
30 .await;
31 } else {
32 debug!("didn't find any device");
33 let _ = socket.send(Message::Text(format!("notfound_{uuid}"))).await;
34 };
35
36 let _ = socket.close().await;
37}
38
39async fn receive_ping_broadcast(state: Arc<AppState>, uuid: String) -> Message {
40 let pm = state.ping_map.clone().into_read_only();
41 let device = pm.get(&uuid).expect("fatal error");
42 debug!("got device: {} (online: {})", device.ip, device.online);
43 if device.online {
44 debug!("already started");
45 Message::Text(BroadcastCommand::success(uuid).to_string())
46 } else {
47 loop {
48 trace!("wait for tx message");
49 let message = state
50 .ping_send
51 .subscribe()
52 .recv()
53 .await
54 .expect("fatal error");
55 trace!("got message {:?}", message);
56
57 if message.uuid != uuid {
58 continue;
59 }
60 trace!("message == uuid success");
61 return Message::Text(message.to_string());
62 }
63 }
64}
65
66async fn get_eta(db: &PgPool) -> i64 {
67 let query = sqlx::query!(r#"SELECT times FROM devices;"#)
68 .fetch_one(db)
69 .await
70 .unwrap();
71
72 let times = if let Some(times) = query.times {
73 times
74 } else {
75 vec![0]
76 };
77
78 times.iter().sum::<i64>() / i64::try_from(times.len()).unwrap()
10} 79}