From f9224ff02e688dec819ab81893320a0611f2a198 Mon Sep 17 00:00:00 2001 From: FxQnLr Date: Tue, 24 Oct 2023 14:56:17 +0200 Subject: Seems to work --- src/main.rs | 12 ++---------- src/routes/start.rs | 10 +++++++--- src/services/ping.rs | 28 +++++++++++++++++----------- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9c31ec8..124c44e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use axum::{Router, routing::post}; use axum::routing::{get, put}; use sqlx::PgPool; use time::util::local_offset; -use tokio::sync::mpsc::{self, Sender}; +use tokio::sync::broadcast::{channel, Sender}; use tracing::{info, level_filters::LevelFilter}; use tracing_subscriber::{EnvFilter, fmt::{self, time::LocalTime}, prelude::*}; use crate::config::SETTINGS; @@ -46,15 +46,8 @@ async fn main() { let db = init_db_pool().await; sqlx::migrate!().run(&db).await.unwrap(); - let (tx, mut rx) = mpsc::channel(32); + let (tx, _) = channel(32); - // FIXME: once_cell? or just static mutable - tokio::spawn( async move { - while let Some(message) = rx.recv().await { - println!("GOT = {}", message); - } - }); - let shared_state = Arc::new(AppState { db, ping_send: tx }); let app = Router::new() @@ -76,5 +69,4 @@ async fn main() { pub struct AppState { db: PgPool, ping_send: Sender, - // ping_receive: Receiver } diff --git a/src/routes/start.rs b/src/routes/start.rs index b45fe5b..863ef16 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use axum::extract::State; use serde_json::{json, Value}; -use tracing::info; +use tracing::{debug, info}; use crate::auth::auth; use crate::config::SETTINGS; use crate::wol::{create_buffer, send_packet}; @@ -39,8 +39,12 @@ pub async fn start(State(state): State>, headers: HeaderMap )?; if payload.ping.is_some_and(|ping| ping) { - tokio::spawn(async move {crate::services::ping::spawn(state.ping_send.clone()).await}); - } + debug!("ping true"); + tokio::spawn(async move { + debug!("Init ping service"); + crate::services::ping::spawn(state.ping_send.clone()).await + }); + }; Ok(Json(json!(StartResponse { id: device.id, boot: true }))) } else { Err(WebolError::Generic) diff --git a/src/services/ping.rs b/src/services/ping.rs index 6e710ec..ff328a5 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -1,8 +1,9 @@ +use std::borrow::Cow; use std::sync::Arc; use axum::{extract::{WebSocketUpgrade, ws::WebSocket, State}, response::Response}; -use tokio::sync::mpsc::Sender; -use tracing::{debug, error}; +use tokio::sync::broadcast::{Sender}; +use tracing::{debug, error, trace}; use crate::{error::WebolError, AppState}; @@ -12,7 +13,7 @@ pub async fn spawn(tx: Sender) -> Result<(), WebolError> { let mut cont = true; while cont { let ping = surge_ping::ping( - "192.168.178.28".parse().map_err(WebolError::IpParse)?, + "127.0.0.1".parse().map_err(WebolError::IpParse)?, &payload ).await; @@ -30,24 +31,29 @@ pub async fn spawn(tx: Sender) -> Result<(), WebolError> { debug!("Ping took {:?}", duration); cont = false; // FIXME: remove unwrap - tx.send("Got ping".to_string()).await.unwrap(); + tx.send("Got ping".to_string()).unwrap(); }; } Ok(()) } -pub async fn ws_ping(ws: WebSocketUpgrade, State(_state): State>) -> Response { - ws.on_upgrade(handle_socket) +// TODO: Status to routes, websocket here +pub async fn ws_ping(State(state): State>, ws: WebSocketUpgrade) -> Response { + ws.on_upgrade(move |socket| handle_socket(socket, state.ping_send.clone())) } // FIXME: Handle commands through enum -async fn handle_socket(mut socket: WebSocket) { +async fn handle_socket(mut socket: WebSocket, tx: Sender) { // TODO: Understand Cow - - // match socket.send(axum::extract::ws::Message::Close(Some(CloseFrame { code: 4000, reason: Cow::Owned("started".to_owned()) }))).await.map_err(WebolError::Axum) { - match socket.send(axum::extract::ws::Message::Text("started".to_string())).await.map_err(WebolError::Axum) { + while let message = tx.subscribe().recv().await.unwrap() { + trace!("GOT = {}", message); + if &message == "Got ping" { + break; + } + }; + match socket.send(axum::extract::ws::Message::Close(Some(axum::extract::ws::CloseFrame { code: 4000, reason: Cow::Owned("started".to_owned()) }))).await.map_err(WebolError::Axum) { Ok(..) => (), Err(err) => { error!("Server Error: {:?}", err) } }; -} +} \ No newline at end of file -- cgit v1.2.3