aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.sqlx/query-1dc5f44967ffdee882f4cef32262fd643b452aacca373ee527c978e816115de6.json (renamed from .sqlx/query-f179f38584f97842cd41159dd2293fa19e5f1975ea8810526924e77683ba92c4.json)12
-rw-r--r--.sqlx/query-25ab5caa92ff80ef3f85ddce1174021ce0fb896a818a6deee291a1eb85ae2a8a.json15
-rw-r--r--.sqlx/query-5ac2b9a76338dd1342938cc09e37b14c828c4ee44b696d83f87f739022be6948.json20
-rw-r--r--.sqlx/query-62c84231c7e9c85dc91d71f6b4f7ee6dae2130c2109fb6f1e47e0990ec395744.json (renamed from .sqlx/query-82c11b5a47389884e4ed945b87dfcee9067933c4df892609700c0766c20fc5c5.json)12
-rw-r--r--Cargo.lock2
-rw-r--r--Cargo.toml2
-rw-r--r--migrations/20231009123228_devices.sql3
-rw-r--r--src/db.rs3
-rw-r--r--src/routes/device.rs4
-rw-r--r--src/routes/start.rs9
-rw-r--r--src/services/ping.rs37
11 files changed, 99 insertions, 20 deletions
diff --git a/.sqlx/query-f179f38584f97842cd41159dd2293fa19e5f1975ea8810526924e77683ba92c4.json b/.sqlx/query-1dc5f44967ffdee882f4cef32262fd643b452aacca373ee527c978e816115de6.json
index 322db91..33d524d 100644
--- a/.sqlx/query-f179f38584f97842cd41159dd2293fa19e5f1975ea8810526924e77683ba92c4.json
+++ b/.sqlx/query-1dc5f44967ffdee882f4cef32262fd643b452aacca373ee527c978e816115de6.json
@@ -1,6 +1,6 @@
1{ 1{
2 "db_name": "PostgreSQL", 2 "db_name": "PostgreSQL",
3 "query": "\n UPDATE devices\n SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4\n RETURNING id, mac, broadcast_addr, ip;\n ", 3 "query": "\n UPDATE devices\n SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4\n RETURNING id, mac, broadcast_addr, ip, times;\n ",
4 "describe": { 4 "describe": {
5 "columns": [ 5 "columns": [
6 { 6 {
@@ -22,6 +22,11 @@
22 "ordinal": 3, 22 "ordinal": 3,
23 "name": "ip", 23 "name": "ip",
24 "type_info": "Varchar" 24 "type_info": "Varchar"
25 },
26 {
27 "ordinal": 4,
28 "name": "times",
29 "type_info": "Int8Array"
25 } 30 }
26 ], 31 ],
27 "parameters": { 32 "parameters": {
@@ -36,8 +41,9 @@
36 false, 41 false,
37 false, 42 false,
38 false, 43 false,
39 false 44 false,
45 true
40 ] 46 ]
41 }, 47 },
42 "hash": "f179f38584f97842cd41159dd2293fa19e5f1975ea8810526924e77683ba92c4" 48 "hash": "1dc5f44967ffdee882f4cef32262fd643b452aacca373ee527c978e816115de6"
43} 49}
diff --git a/.sqlx/query-25ab5caa92ff80ef3f85ddce1174021ce0fb896a818a6deee291a1eb85ae2a8a.json b/.sqlx/query-25ab5caa92ff80ef3f85ddce1174021ce0fb896a818a6deee291a1eb85ae2a8a.json
new file mode 100644
index 0000000..71438e3
--- /dev/null
+++ b/.sqlx/query-25ab5caa92ff80ef3f85ddce1174021ce0fb896a818a6deee291a1eb85ae2a8a.json
@@ -0,0 +1,15 @@
1{
2 "db_name": "PostgreSQL",
3 "query": "\n UPDATE devices\n SET times = array_append(times, $1)\n WHERE id = $2;\n ",
4 "describe": {
5 "columns": [],
6 "parameters": {
7 "Left": [
8 "Int8",
9 "Text"
10 ]
11 },
12 "nullable": []
13 },
14 "hash": "25ab5caa92ff80ef3f85ddce1174021ce0fb896a818a6deee291a1eb85ae2a8a"
15}
diff --git a/.sqlx/query-5ac2b9a76338dd1342938cc09e37b14c828c4ee44b696d83f87f739022be6948.json b/.sqlx/query-5ac2b9a76338dd1342938cc09e37b14c828c4ee44b696d83f87f739022be6948.json
new file mode 100644
index 0000000..c385101
--- /dev/null
+++ b/.sqlx/query-5ac2b9a76338dd1342938cc09e37b14c828c4ee44b696d83f87f739022be6948.json
@@ -0,0 +1,20 @@
1{
2 "db_name": "PostgreSQL",
3 "query": "SELECT times FROM devices;",
4 "describe": {
5 "columns": [
6 {
7 "ordinal": 0,
8 "name": "times",
9 "type_info": "Int8Array"
10 }
11 ],
12 "parameters": {
13 "Left": []
14 },
15 "nullable": [
16 true
17 ]
18 },
19 "hash": "5ac2b9a76338dd1342938cc09e37b14c828c4ee44b696d83f87f739022be6948"
20}
diff --git a/.sqlx/query-82c11b5a47389884e4ed945b87dfcee9067933c4df892609700c0766c20fc5c5.json b/.sqlx/query-62c84231c7e9c85dc91d71f6b4f7ee6dae2130c2109fb6f1e47e0990ec395744.json
index c0a933d..5ec47e3 100644
--- a/.sqlx/query-82c11b5a47389884e4ed945b87dfcee9067933c4df892609700c0766c20fc5c5.json
+++ b/.sqlx/query-62c84231c7e9c85dc91d71f6b4f7ee6dae2130c2109fb6f1e47e0990ec395744.json
@@ -1,6 +1,6 @@
1{ 1{
2 "db_name": "PostgreSQL", 2 "db_name": "PostgreSQL",
3 "query": "\n SELECT id, mac, broadcast_addr, ip\n FROM devices\n WHERE id = $1;\n ", 3 "query": "\n SELECT id, mac, broadcast_addr, ip, times\n FROM devices\n WHERE id = $1;\n ",
4 "describe": { 4 "describe": {
5 "columns": [ 5 "columns": [
6 { 6 {
@@ -22,6 +22,11 @@
22 "ordinal": 3, 22 "ordinal": 3,
23 "name": "ip", 23 "name": "ip",
24 "type_info": "Varchar" 24 "type_info": "Varchar"
25 },
26 {
27 "ordinal": 4,
28 "name": "times",
29 "type_info": "Int8Array"
25 } 30 }
26 ], 31 ],
27 "parameters": { 32 "parameters": {
@@ -33,8 +38,9 @@
33 false, 38 false,
34 false, 39 false,
35 false, 40 false,
36 false 41 false,
42 true
37 ] 43 ]
38 }, 44 },
39 "hash": "82c11b5a47389884e4ed945b87dfcee9067933c4df892609700c0766c20fc5c5" 45 "hash": "62c84231c7e9c85dc91d71f6b4f7ee6dae2130c2109fb6f1e47e0990ec395744"
40} 46}
diff --git a/Cargo.lock b/Cargo.lock
index 9873a4c..8f62060 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2187,7 +2187,7 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
2187 2187
2188[[package]] 2188[[package]]
2189name = "webol" 2189name = "webol"
2190version = "0.2.1" 2190version = "0.3.0"
2191dependencies = [ 2191dependencies = [
2192 "axum", 2192 "axum",
2193 "axum-macros", 2193 "axum-macros",
diff --git a/Cargo.toml b/Cargo.toml
index 5279abf..3207453 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
1[package] 1[package]
2name = "webol" 2name = "webol"
3version = "0.2.1" 3version = "0.3.0"
4edition = "2021" 4edition = "2021"
5 5
6# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html 6# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
diff --git a/migrations/20231009123228_devices.sql b/migrations/20231009123228_devices.sql
index b911b19..d36946c 100644
--- a/migrations/20231009123228_devices.sql
+++ b/migrations/20231009123228_devices.sql
@@ -4,5 +4,6 @@ CREATE TABLE IF NOT EXISTS "devices"
4 "id" VARCHAR(255) PRIMARY KEY NOT NULL, 4 "id" VARCHAR(255) PRIMARY KEY NOT NULL,
5 "mac" VARCHAR(17) NOT NULL, 5 "mac" VARCHAR(17) NOT NULL,
6 "broadcast_addr" VARCHAR(39) NOT NULL, 6 "broadcast_addr" VARCHAR(39) NOT NULL,
7 "ip" VARCHAR(39) NOT NULL 7 "ip" VARCHAR(39) NOT NULL,
8 "times" BIGINT[]
8) 9)
diff --git a/src/db.rs b/src/db.rs
index c012b47..8a6b16e 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -13,7 +13,8 @@ pub struct Device {
13 pub id: String, 13 pub id: String,
14 pub mac: String, 14 pub mac: String,
15 pub broadcast_addr: String, 15 pub broadcast_addr: String,
16 pub ip: String 16 pub ip: String,
17 pub times: Option<Vec<i64>>
17} 18}
18 19
19pub async fn init_db_pool() -> PgPool { 20pub async fn init_db_pool() -> PgPool {
diff --git a/src/routes/device.rs b/src/routes/device.rs
index 1eeff0b..678d117 100644
--- a/src/routes/device.rs
+++ b/src/routes/device.rs
@@ -16,7 +16,7 @@ pub async fn get_device(State(state): State<Arc<crate::AppState>>, headers: Head
16 let device = sqlx::query_as!( 16 let device = sqlx::query_as!(
17 Device, 17 Device,
18 r#" 18 r#"
19 SELECT id, mac, broadcast_addr, ip 19 SELECT id, mac, broadcast_addr, ip, times
20 FROM devices 20 FROM devices
21 WHERE id = $1; 21 WHERE id = $1;
22 "#, 22 "#,
@@ -79,7 +79,7 @@ pub async fn post_device(State(state): State<Arc<crate::AppState>>, headers: Hea
79 r#" 79 r#"
80 UPDATE devices 80 UPDATE devices
81 SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4 81 SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4
82 RETURNING id, mac, broadcast_addr, ip; 82 RETURNING id, mac, broadcast_addr, ip, times;
83 "#, 83 "#,
84 payload.mac, 84 payload.mac,
85 payload.broadcast_addr, 85 payload.broadcast_addr,
diff --git a/src/routes/start.rs b/src/routes/start.rs
index 271f924..401ae97 100644
--- a/src/routes/start.rs
+++ b/src/routes/start.rs
@@ -22,7 +22,7 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap
22 let device = sqlx::query_as!( 22 let device = sqlx::query_as!(
23 Device, 23 Device,
24 r#" 24 r#"
25 SELECT id, mac, broadcast_addr, ip 25 SELECT id, mac, broadcast_addr, ip, times
26 FROM devices 26 FROM devices
27 WHERE id = $1; 27 WHERE id = $1;
28 "#, 28 "#,
@@ -40,19 +40,20 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap
40 &device.broadcast_addr.parse().map_err(WebolError::IpParse)?, 40 &device.broadcast_addr.parse().map_err(WebolError::IpParse)?,
41 create_buffer(&device.mac)? 41 create_buffer(&device.mac)?
42 )?; 42 )?;
43 43 let dev_id = device.id.clone();
44 let uuid = if payload.ping.is_some_and(|ping| ping) { 44 let uuid = if payload.ping.is_some_and(|ping| ping) {
45 let uuid_gen = Uuid::new_v4().to_string(); 45 let uuid_gen = Uuid::new_v4().to_string();
46 let uuid_genc = uuid_gen.clone(); 46 let uuid_genc = uuid_gen.clone();
47 // TODO: Check if service already runs
47 tokio::spawn(async move { 48 tokio::spawn(async move {
48 debug!("init ping service"); 49 debug!("init ping service");
49 state.ping_map.insert(uuid_gen.clone(), PingValue { ip: device.ip.clone(), online: false }); 50 state.ping_map.insert(uuid_gen.clone(), PingValue { ip: device.ip.clone(), online: false });
50 51
51 crate::services::ping::spawn(state.ping_send.clone(), device.ip, uuid_gen.clone(), &state.ping_map).await 52 crate::services::ping::spawn(state.ping_send.clone(), device, uuid_gen.clone(), &state.ping_map, &state.db).await
52 }); 53 });
53 Some(uuid_genc) 54 Some(uuid_genc)
54 } else { None }; 55 } else { None };
55 Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) 56 Ok(Json(json!(StartResponse { id: dev_id, boot: true, uuid })))
56 } else { 57 } else {
57 Err(WebolError::Generic) 58 Err(WebolError::Generic)
58 } 59 }
diff --git a/src/services/ping.rs b/src/services/ping.rs
index 67acc1c..2bff61f 100644
--- a/src/services/ping.rs
+++ b/src/services/ping.rs
@@ -5,11 +5,13 @@ use std::sync::Arc;
5use axum::extract::{ws::WebSocket}; 5use axum::extract::{ws::WebSocket};
6use axum::extract::ws::Message; 6use axum::extract::ws::Message;
7use dashmap::DashMap; 7use dashmap::DashMap;
8use sqlx::PgPool;
8use time::{Duration, Instant}; 9use time::{Duration, Instant};
9use tokio::sync::broadcast::{Sender}; 10use tokio::sync::broadcast::{Sender};
10use tracing::{debug, error, trace}; 11use tracing::{debug, error, trace};
11use crate::AppState; 12use crate::AppState;
12use crate::config::SETTINGS; 13use crate::config::SETTINGS;
14use crate::db::Device;
13 15
14pub type PingMap = DashMap<String, PingValue>; 16pub type PingMap = DashMap<String, PingValue>;
15 17
@@ -19,11 +21,11 @@ pub struct PingValue {
19 pub online: bool 21 pub online: bool
20} 22}
21 23
22pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: &PingMap) { 24pub async fn spawn(tx: Sender<BroadcastCommands>, device: Device, uuid: String, ping_map: &PingMap, db: &PgPool) {
23 let timer = Instant::now(); 25 let timer = Instant::now();
24 let payload = [0; 8]; 26 let payload = [0; 8];
25 27
26 let ping_ip = IpAddr::from_str(&ip).expect("bad ip"); 28 let ping_ip = IpAddr::from_str(&device.ip).expect("bad ip");
27 29
28 let mut msg: Option<BroadcastCommands> = None; 30 let mut msg: Option<BroadcastCommands> = None;
29 while msg.is_none() { 31 while msg.is_none() {
@@ -52,7 +54,16 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping
52 54
53 let _ = tx.send(msg.clone()); 55 let _ = tx.send(msg.clone());
54 if let BroadcastCommands::Success(..) = msg { 56 if let BroadcastCommands::Success(..) = msg {
55 ping_map.insert(uuid.clone(), PingValue { ip: ip.clone(), online: true }); 57 sqlx::query!(
58 r#"
59 UPDATE devices
60 SET times = array_append(times, $1)
61 WHERE id = $2;
62 "#,
63 timer.elapsed().whole_seconds(),
64 device.id
65 ).execute(db).await.unwrap();
66 ping_map.insert(uuid.clone(), PingValue { ip: device.ip.clone(), online: true });
56 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; 67 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
57 } 68 }
58 trace!("remove {} from ping_map", uuid); 69 trace!("remove {} from ping_map", uuid);
@@ -71,7 +82,10 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) {
71 let msg = socket.recv().await; 82 let msg = socket.recv().await;
72 let uuid = msg.unwrap().unwrap().into_text().unwrap(); 83 let uuid = msg.unwrap().unwrap().into_text().unwrap();
73 84
74 trace!("Search for uuid: {:?}", uuid); 85 trace!("Search for uuid: {}", uuid);
86
87 let eta = get_eta(&state.db).await;
88 let _ = socket.send(Message::Text(format!("eta_{}_{}", eta, uuid))).await;
75 89
76 let device_exists = state.ping_map.contains_key(&uuid); 90 let device_exists = state.ping_map.contains_key(&uuid);
77 match device_exists { 91 match device_exists {
@@ -87,6 +101,21 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) {
87 let _ = socket.close().await; 101 let _ = socket.close().await;
88} 102}
89 103
104async fn get_eta(db: &PgPool) -> i64 {
105 let query = sqlx::query!(
106 r#"SELECT times FROM devices;"#
107 ).fetch_optional(db).await.unwrap();
108
109 match query {
110 None => { -1 },
111 Some(rec) => {
112 let times = rec.times.unwrap();
113 times.iter().sum::<i64>() / times.len() as i64
114 }
115 }
116
117}
118
90async fn process_device(state: Arc<AppState>, uuid: String) -> Message { 119async fn process_device(state: Arc<AppState>, uuid: String) -> Message {
91 let pm = state.ping_map.clone().into_read_only(); 120 let pm = state.ping_map.clone().into_read_only();
92 let device = pm.get(&uuid).expect("fatal error"); 121 let device = pm.get(&uuid).expect("fatal error");