aboutsummaryrefslogtreecommitdiff
path: root/src/services
diff options
context:
space:
mode:
authorfx <[email protected]>2023-10-25 12:53:31 +0200
committerfx <[email protected]>2023-10-25 12:53:31 +0200
commit00dd8a9abee6b9f0cfc37c6f20f30f0d99dfe91a (patch)
tree7906b5836f3ca24686b1b7418c2128b93c33a398 /src/services
parentf9224ff02e688dec819ab81893320a0611f2a198 (diff)
downloadwebol-00dd8a9abee6b9f0cfc37c6f20f30f0d99dfe91a.tar
webol-00dd8a9abee6b9f0cfc37c6f20f30f0d99dfe91a.tar.gz
webol-00dd8a9abee6b9f0cfc37c6f20f30f0d99dfe91a.zip
runs, no error handling
Diffstat (limited to 'src/services')
-rw-r--r--src/services/ping.rs79
1 files changed, 56 insertions, 23 deletions
diff --git a/src/services/ping.rs b/src/services/ping.rs
index ff328a5..e3d465d 100644
--- a/src/services/ping.rs
+++ b/src/services/ping.rs
@@ -1,26 +1,29 @@
1use std::borrow::Cow; 1use std::borrow::Cow;
2use std::collections::HashMap;
2use std::sync::Arc; 3use std::sync::Arc;
3 4
4use axum::{extract::{WebSocketUpgrade, ws::WebSocket, State}, response::Response}; 5use axum::extract::{ws::WebSocket};
6use axum::extract::ws::Message;
5use tokio::sync::broadcast::{Sender}; 7use tokio::sync::broadcast::{Sender};
6use tracing::{debug, error, trace}; 8use tokio::sync::Mutex;
9use tracing::{debug, error, trace, warn};
7 10
8use crate::{error::WebolError, AppState}; 11use crate::error::WebolError;
9 12
10pub async fn spawn(tx: Sender<String>) -> Result<(), WebolError> { 13pub async fn spawn(tx: Sender<String>, ip: String) -> Result<(), WebolError> {
11 let payload = [0; 8]; 14 let payload = [0; 8];
12 15
13 let mut cont = true; 16 let mut cont = true;
14 while cont { 17 while cont {
15 let ping = surge_ping::ping( 18 let ping = surge_ping::ping(
16 "127.0.0.1".parse().map_err(WebolError::IpParse)?, 19 ip.parse().map_err(WebolError::IpParse)?,
17 &payload 20 &payload
18 ).await; 21 ).await;
19 22
20 if let Err(ping) = ping { 23 if let Err(ping) = ping {
21 cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); 24 cont = matches!(ping, surge_ping::SurgeError::Timeout { .. });
22 25
23 debug!("{}", cont); 26 // debug!("{}", cont);
24 27
25 if !cont { 28 if !cont {
26 return Err(ping).map_err(WebolError::Ping) 29 return Err(ping).map_err(WebolError::Ping)
@@ -31,29 +34,59 @@ pub async fn spawn(tx: Sender<String>) -> Result<(), WebolError> {
31 debug!("Ping took {:?}", duration); 34 debug!("Ping took {:?}", duration);
32 cont = false; 35 cont = false;
33 // FIXME: remove unwrap 36 // FIXME: remove unwrap
34 tx.send("Got ping".to_string()).unwrap(); 37 // FIXME: if error: SendError because no listener, then handle the entry directly
38 tx.send(ip.clone());
35 }; 39 };
36 } 40 }
37 41
38 Ok(()) 42 Ok(())
39} 43}
40 44
41// TODO: Status to routes, websocket here
42pub async fn ws_ping(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response {
43 ws.on_upgrade(move |socket| handle_socket(socket, state.ping_send.clone()))
44}
45
46// FIXME: Handle commands through enum 45// FIXME: Handle commands through enum
47async fn handle_socket(mut socket: WebSocket, tx: Sender<String>) { 46pub async fn status_websocket(mut socket: WebSocket, tx: Sender<String>, ping_map: Arc<Mutex<HashMap<String, (String, bool)>>>) {
48 // TODO: Understand Cow 47 warn!("{:?}", ping_map);
49 while let message = tx.subscribe().recv().await.unwrap() { 48
50 trace!("GOT = {}", message); 49 let mut uuid: Option<String> = None;
51 if &message == "Got ping" { 50
52 break; 51 trace!("wait for ws message (uuid)");
52 let msg = socket.recv().await;
53 uuid = Some(msg.unwrap().unwrap().into_text().unwrap());
54
55 let uuid = uuid.unwrap();
56
57 trace!("Search for uuid: {:?}", uuid);
58
59 let device = ping_map.lock().await.get(&uuid).unwrap().to_owned();
60
61 trace!("got device: {:?}", device);
62
63 match device.1 {
64 true => {
65 debug!("already started");
66 socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap();
67 socket.close().await.unwrap();
68 },
69 false => {
70 let ip = device.0.to_owned();
71 let mut i = 0;
72 loop{
73 trace!("{}", i);
74 // TODO: Check if older than 10 minutes, close if true
75 trace!("wait for tx message");
76 let message = tx.subscribe().recv().await.unwrap();
77 trace!("GOT = {}", message);
78 if message == ip {
79 trace!("message == ip");
80 break;
81 }
82 i += 1;
83 };
84
85 socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap();
86 socket.close().await.unwrap();
87 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
88 ping_map.lock().await.remove(&uuid);
89 warn!("{:?}", ping_map);
53 } 90 }
54 }; 91 }
55 match socket.send(axum::extract::ws::Message::Close(Some(axum::extract::ws::CloseFrame { code: 4000, reason: Cow::Owned("started".to_owned()) }))).await.map_err(WebolError::Axum) {
56 Ok(..) => (),
57 Err(err) => { error!("Server Error: {:?}", err) }
58 };
59} \ No newline at end of file 92} \ No newline at end of file