From 00dd8a9abee6b9f0cfc37c6f20f30f0d99dfe91a Mon Sep 17 00:00:00 2001 From: fx Date: Wed, 25 Oct 2023 12:53:31 +0200 Subject: runs, no error handling --- Cargo.lock | 24 ++++++++++++++++ Cargo.toml | 2 ++ src/main.rs | 13 ++++++--- src/routes/mod.rs | 3 +- src/routes/start.rs | 24 +++++++++++----- src/routes/status.rs | 12 ++++++++ src/services/ping.rs | 79 +++++++++++++++++++++++++++++++++++++--------------- 7 files changed, 122 insertions(+), 35 deletions(-) create mode 100644 src/routes/status.rs diff --git a/Cargo.lock b/Cargo.lock index bf813bf..2650edc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,6 +134,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-macros" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -2127,6 +2139,16 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "uuid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" +dependencies = [ + "getrandom", + "rand", +] + [[package]] name = "valuable" version = "0.1.0" @@ -2165,6 +2187,7 @@ name = "webol" version = "0.1.0" dependencies = [ "axum", + "axum-macros", "config", "once_cell", "serde", @@ -2175,6 +2198,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9bdc4da..d29a1b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,5 @@ config = "0.13.3" once_cell = "1.18.0" sqlx = { version = "0.7.1", features = ["postgres", "runtime-tokio"]} surge-ping = "0.8.0" +axum-macros = "0.3.8" +uuid = { version = "1.5.0", features = ["v4", "fast-rng"] } diff --git a/src/main.rs b/src/main.rs index 124c44e..854b59d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::env; use std::sync::Arc; use axum::{Router, routing::post}; @@ -5,13 +6,14 @@ use axum::routing::{get, put}; use sqlx::PgPool; use time::util::local_offset; use tokio::sync::broadcast::{channel, Sender}; +use tokio::sync::Mutex; use tracing::{info, level_filters::LevelFilter}; use tracing_subscriber::{EnvFilter, fmt::{self, time::LocalTime}, prelude::*}; use crate::config::SETTINGS; use crate::db::init_db_pool; use crate::routes::device::{get_device, post_device, put_device}; use crate::routes::start::start; -use crate::services::ping::ws_ping; +use crate::routes::status::status; mod auth; mod config; @@ -47,15 +49,17 @@ async fn main() { sqlx::migrate!().run(&db).await.unwrap(); let (tx, _) = channel(32); + + let ping_map: HashMap = HashMap::new(); - let shared_state = Arc::new(AppState { db, ping_send: tx }); + let shared_state = Arc::new(AppState { db, ping_send: tx, ping_map: Arc::new(Mutex::new(ping_map)) }); let app = Router::new() .route("/start", post(start)) .route("/device", get(get_device)) .route("/device", put(put_device)) .route("/device", post(post_device)) - .route("/status", get(ws_ping)) + .route("/status", get(status)) .with_state(shared_state); let addr = SETTINGS.get_string("serveraddr").unwrap_or("0.0.0.0:7229".to_string()); @@ -69,4 +73,5 @@ async fn main() { pub struct AppState { db: PgPool, ping_send: Sender, -} + ping_map: Arc>>, +} \ No newline at end of file diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 12fbfab..d5ab0d6 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -1,2 +1,3 @@ pub mod start; -pub mod device; \ No newline at end of file +pub mod device; +pub mod status; \ No newline at end of file diff --git a/src/routes/start.rs b/src/routes/start.rs index 863ef16..45e7ec8 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs @@ -4,15 +4,18 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use axum::extract::State; use serde_json::{json, Value}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; +use uuid::Uuid; use crate::auth::auth; use crate::config::SETTINGS; use crate::wol::{create_buffer, send_packet}; use crate::db::Device; use crate::error::WebolError; +#[axum_macros::debug_handler] pub async fn start(State(state): State>, headers: HeaderMap, Json(payload): Json) -> Result, WebolError> { info!("POST request"); + warn!("{:?}", state.ping_map); let secret = headers.get("authorization"); let authorized = auth(secret).map_err(WebolError::Auth)?; if authorized { @@ -38,14 +41,20 @@ pub async fn start(State(state): State>, headers: HeaderMap create_buffer(&device.mac)? )?; - if payload.ping.is_some_and(|ping| ping) { - debug!("ping true"); - tokio::spawn(async move { + let uuid = if payload.ping.is_some_and(|ping| ping) { + let uuid_gen = Uuid::new_v4().to_string(); + let uuid_genc = uuid_gen.clone(); + tokio::spawn(async move{ debug!("Init ping service"); - crate::services::ping::spawn(state.ping_send.clone()).await + state.ping_map.lock().await.insert(uuid_gen, ("192.168.178.94".to_string(), false)); + + warn!("{:?}", state.ping_map); + + crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string()).await; }); - }; - Ok(Json(json!(StartResponse { id: device.id, boot: true }))) + Some(uuid_genc) + } else { None }; + Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) } else { Err(WebolError::Generic) } @@ -61,4 +70,5 @@ pub struct StartPayload { struct StartResponse { id: String, boot: bool, + uuid: Option, } diff --git a/src/routes/status.rs b/src/routes/status.rs new file mode 100644 index 0000000..cdecf6a --- /dev/null +++ b/src/routes/status.rs @@ -0,0 +1,12 @@ +use std::sync::Arc; +use axum::extract::{State, WebSocketUpgrade}; +use axum::response::Response; +use serde::Deserialize; +use crate::AppState; +use crate::services::ping::status_websocket; + +#[axum_macros::debug_handler] +pub async fn status(State(state): State>, ws: WebSocketUpgrade) -> Response { + // TODO: remove unwrap + ws.on_upgrade(move |socket| status_websocket(socket, state.ping_send.clone(), state.ping_map.clone())) +} \ No newline at end of file diff --git a/src/services/ping.rs b/src/services/ping.rs index ff328a5..e3d465d 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -1,26 +1,29 @@ use std::borrow::Cow; +use std::collections::HashMap; use std::sync::Arc; -use axum::{extract::{WebSocketUpgrade, ws::WebSocket, State}, response::Response}; +use axum::extract::{ws::WebSocket}; +use axum::extract::ws::Message; use tokio::sync::broadcast::{Sender}; -use tracing::{debug, error, trace}; +use tokio::sync::Mutex; +use tracing::{debug, error, trace, warn}; -use crate::{error::WebolError, AppState}; +use crate::error::WebolError; -pub async fn spawn(tx: Sender) -> Result<(), WebolError> { +pub async fn spawn(tx: Sender, ip: String) -> Result<(), WebolError> { let payload = [0; 8]; let mut cont = true; while cont { let ping = surge_ping::ping( - "127.0.0.1".parse().map_err(WebolError::IpParse)?, + ip.parse().map_err(WebolError::IpParse)?, &payload ).await; if let Err(ping) = ping { cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); - debug!("{}", cont); + // debug!("{}", cont); if !cont { return Err(ping).map_err(WebolError::Ping) @@ -31,29 +34,59 @@ pub async fn spawn(tx: Sender) -> Result<(), WebolError> { debug!("Ping took {:?}", duration); cont = false; // FIXME: remove unwrap - tx.send("Got ping".to_string()).unwrap(); + // FIXME: if error: SendError because no listener, then handle the entry directly + tx.send(ip.clone()); }; } Ok(()) } -// TODO: Status to routes, websocket here -pub async fn ws_ping(State(state): State>, ws: WebSocketUpgrade) -> Response { - ws.on_upgrade(move |socket| handle_socket(socket, state.ping_send.clone())) -} - // FIXME: Handle commands through enum -async fn handle_socket(mut socket: WebSocket, tx: Sender) { - // TODO: Understand Cow - while let message = tx.subscribe().recv().await.unwrap() { - trace!("GOT = {}", message); - if &message == "Got ping" { - break; +pub async fn status_websocket(mut socket: WebSocket, tx: Sender, ping_map: Arc>>) { + warn!("{:?}", ping_map); + + let mut uuid: Option = None; + + trace!("wait for ws message (uuid)"); + let msg = socket.recv().await; + uuid = Some(msg.unwrap().unwrap().into_text().unwrap()); + + let uuid = uuid.unwrap(); + + trace!("Search for uuid: {:?}", uuid); + + let device = ping_map.lock().await.get(&uuid).unwrap().to_owned(); + + trace!("got device: {:?}", device); + + match device.1 { + true => { + debug!("already started"); + socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); + socket.close().await.unwrap(); + }, + false => { + let ip = device.0.to_owned(); + let mut i = 0; + loop{ + trace!("{}", i); + // TODO: Check if older than 10 minutes, close if true + trace!("wait for tx message"); + let message = tx.subscribe().recv().await.unwrap(); + trace!("GOT = {}", message); + if message == ip { + trace!("message == ip"); + break; + } + i += 1; + }; + + socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); + socket.close().await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + ping_map.lock().await.remove(&uuid); + warn!("{:?}", ping_map); } - }; - match socket.send(axum::extract::ws::Message::Close(Some(axum::extract::ws::CloseFrame { code: 4000, reason: Cow::Owned("started".to_owned()) }))).await.map_err(WebolError::Axum) { - Ok(..) => (), - Err(err) => { error!("Server Error: {:?}", err) } - }; + } } \ No newline at end of file -- cgit v1.2.3