From 3428a637ce420baef9aa9f9803e71bd587867005 Mon Sep 17 00:00:00 2001 From: FxQnLr Date: Wed, 10 Apr 2024 00:16:55 +0200 Subject: Closes #24. Changed postgres to json directory storage --- src/config.rs | 4 +- src/db.rs | 37 ---------------- src/error.rs | 8 ++-- src/main.rs | 33 ++++---------- src/routes/device.rs | 118 ++++++++++++--------------------------------------- src/routes/start.rs | 92 ++++++++------------------------------- src/routes/status.rs | 24 ++--------- src/services/ping.rs | 45 ++++++++++---------- src/storage.rs | 65 ++++++++++++++++++++++++++++ src/wol.rs | 20 --------- 10 files changed, 152 insertions(+), 294 deletions(-) delete mode 100644 src/db.rs create mode 100644 src/storage.rs (limited to 'src') diff --git a/src/config.rs b/src/config.rs index 9636af4..124893b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,7 +5,6 @@ use crate::auth; #[derive(Debug, Clone, Deserialize)] pub struct Config { - pub database_url: String, pub serveraddr: String, pub pingtimeout: i64, pub pingthreshold: i64, @@ -26,9 +25,10 @@ impl Config { .set_default("pingtimeout", 10)? .set_default("pingthreshold", 1)? .set_default("timeoffset", 0)? + .set_default("auth.secret", "")? .add_source(File::with_name("config.toml").required(false)) .add_source(File::with_name("config.dev.toml").required(false)) - .add_source(config::Environment::with_prefix("WEBOL").prefix_separator("_")) + .add_source(config::Environment::with_prefix("WEBOL").separator("_")) .build()?; config.try_deserialize() diff --git a/src/db.rs b/src/db.rs deleted file mode 100644 index a2b2009..0000000 --- a/src/db.rs +++ /dev/null @@ -1,37 +0,0 @@ -use serde::Serialize; -use sqlx::{PgPool, postgres::PgPoolOptions, types::{ipnetwork::IpNetwork, mac_address::MacAddress}}; -use tracing::{debug, info}; -use utoipa::ToSchema; - -#[derive(Serialize, Debug)] -pub struct Device { - pub id: String, - pub mac: MacAddress, - pub broadcast_addr: String, - pub ip: IpNetwork, - pub times: Option> -} - -#[derive(ToSchema)] -#[schema(as = Device)] -pub struct DeviceSchema { - pub id: String, - pub mac: String, - pub broadcast_addr: String, - pub ip: String, - pub times: Option> -} - -pub async fn init_db_pool(db_url: &str) -> PgPool { - debug!("attempt to connect dbPool to '{}'", db_url); - - let pool = PgPoolOptions::new() - .max_connections(5) - .connect(db_url) - .await - .unwrap(); - - info!("dbPool successfully connected to '{}'", db_url); - - pool -} diff --git a/src/error.rs b/src/error.rs index 006fcdb..8a011bf 100644 --- a/src/error.rs +++ b/src/error.rs @@ -11,10 +11,10 @@ use tracing::error; #[derive(Debug, thiserror::Error, ToSchema)] pub enum Error { - #[error("db: {source}")] - Db { + #[error("json: {source}")] + Json { #[from] - source: sqlx::Error, + source: serde_json::Error, }, #[error("buffer parse: {source}")] @@ -52,7 +52,7 @@ impl IntoResponse for Error { fn into_response(self) -> Response { error!("{}", self.to_string()); let (status, error_message) = match self { - Self::Db { source } => { + Self::Json { source } => { error!("{source}"); (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") } diff --git a/src/main.rs b/src/main.rs index 70c67cf..cf0d39b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,5 @@ use crate::{ - config::Config, - db::init_db_pool, - routes::{device, start, status}, - services::ping::{BroadcastCommand, StatusMap}, + config::Config, routes::{device, start, status}, services::ping::{BroadcastCommand, StatusMap}, storage::Device }; use axum::{ middleware::from_fn_with_state, @@ -10,7 +7,6 @@ use axum::{ Router, }; use dashmap::DashMap; -use sqlx::PgPool; use std::{env, sync::Arc}; use time::UtcOffset; use tokio::sync::broadcast::{channel, Sender}; @@ -26,10 +22,10 @@ use utoipa::{ }; use utoipa_swagger_ui::SwaggerUi; +mod auth; mod config; -mod db; +mod storage; mod error; -mod auth; mod routes; mod services; mod wol; @@ -39,20 +35,16 @@ mod wol; paths( start::post, start::get, - start::start_payload, device::get, - device::get_payload, device::post, device::put, ), components( schemas( - start::PayloadOld, start::Payload, start::Response, - device::DevicePayload, - device::GetDevicePayload, - db::DeviceSchema, + device::Payload, + storage::DeviceSchema, ) ), modifiers(&SecurityAddon), @@ -99,34 +91,26 @@ async fn main() -> color_eyre::eyre::Result<()> { ) .init(); - let version = env!("CARGO_PKG_VERSION"); + Device::setup()?; + let version = env!("CARGO_PKG_VERSION"); info!("start webol v{}", version); - let db = init_db_pool(&config.database_url).await; - sqlx::migrate!().run(&db).await.unwrap(); - let (tx, _) = channel(32); let ping_map: StatusMap = DashMap::new(); let shared_state = AppState { - db, config: config.clone(), ping_send: tx, ping_map, }; let app = Router::new() - .route("/start", post(start::start_payload)) .route("/start/:id", post(start::post).get(start::get)) - .route( - "/device", - post(device::post).get(device::get_payload).put(device::put), - ) + .route("/device", post(device::post).put(device::put)) .route("/device/:id", get(device::get)) .route("/status", get(status::status)) - // TODO: Don't load on `None` Auth .route_layer(from_fn_with_state(shared_state.clone(), auth::auth)) .merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi())) .with_state(Arc::new(shared_state)); @@ -141,7 +125,6 @@ async fn main() -> color_eyre::eyre::Result<()> { #[derive(Clone)] pub struct AppState { - db: PgPool, config: Config, ping_send: Sender, ping_map: StatusMap, diff --git a/src/routes/device.rs b/src/routes/device.rs index 40b5cd8..b6bd9d0 100644 --- a/src/routes/device.rs +++ b/src/routes/device.rs @@ -1,47 +1,15 @@ -use crate::db::Device; use crate::error::Error; -use axum::extract::{Path, State}; +use crate::storage::Device; +use axum::extract::Path; use axum::Json; +use ipnetwork::IpNetwork; use mac_address::MacAddress; use serde::Deserialize; use serde_json::{json, Value}; -use sqlx::types::ipnetwork::IpNetwork; -use std::{str::FromStr, sync::Arc}; +use std::str::FromStr; use tracing::{debug, info}; use utoipa::ToSchema; -#[utoipa::path( - get, - path = "/device", - request_body = GetDevicePayload, - responses( - (status = 200, description = "Get `Device` information", body = [Device]) - ), - security(("api_key" = [])) -)] -#[deprecated] -pub async fn get_payload( - State(state): State>, - Json(payload): Json, -) -> Result, Error> { - info!("get device {}", payload.id); - let device = sqlx::query_as!( - Device, - r#" - SELECT id, mac, broadcast_addr, ip, times - FROM devices - WHERE id = $1; - "#, - payload.id - ) - .fetch_one(&state.db) - .await?; - - debug!("got device {:?}", device); - - Ok(Json(json!(device))) -} - #[utoipa::path( get, path = "/device/{id}", @@ -53,22 +21,10 @@ pub async fn get_payload( ), security((), ("api_key" = [])) )] -pub async fn get( - State(state): State>, - Path(path): Path, -) -> Result, Error> { - info!("get device from path {}", path); - let device = sqlx::query_as!( - Device, - r#" - SELECT id, mac, broadcast_addr, ip, times - FROM devices - WHERE id = $1; - "#, - path - ) - .fetch_one(&state.db) - .await?; +pub async fn get(Path(id): Path) -> Result, Error> { + info!("get device from path {}", id); + + let device = Device::read(&id)?; debug!("got device {:?}", device); @@ -76,13 +32,7 @@ pub async fn get( } #[derive(Deserialize, ToSchema)] -#[deprecated] -pub struct GetDevicePayload { - id: String, -} - -#[derive(Deserialize, ToSchema)] -pub struct DevicePayload { +pub struct Payload { id: String, mac: String, broadcast_addr: String, @@ -92,15 +42,14 @@ pub struct DevicePayload { #[utoipa::path( put, path = "/device", - request_body = DevicePayload, + request_body = Payload, responses( (status = 200, description = "add device to storage", body = [DeviceSchema]) ), security((), ("api_key" = [])) )] pub async fn put( - State(state): State>, - Json(payload): Json, + Json(payload): Json, ) -> Result, Error> { info!( "add device {} ({}, {}, {})", @@ -109,20 +58,14 @@ pub async fn put( let ip = IpNetwork::from_str(&payload.ip)?; let mac = MacAddress::from_str(&payload.mac)?; - let device = sqlx::query_as!( - Device, - r#" - INSERT INTO devices (id, mac, broadcast_addr, ip) - VALUES ($1, $2, $3, $4) - RETURNING id, mac, broadcast_addr, ip, times; - "#, - payload.id, + let device = Device { + id: payload.id, mac, - payload.broadcast_addr, - ip - ) - .fetch_one(&state.db) - .await?; + broadcast_addr: payload.broadcast_addr, + ip, + times: None, + }; + device.write()?; Ok(Json(json!(device))) } @@ -130,15 +73,14 @@ pub async fn put( #[utoipa::path( post, path = "/device", - request_body = DevicePayload, + request_body = Payload, responses( (status = 200, description = "update device in storage", body = [DeviceSchema]) ), security((), ("api_key" = [])) )] pub async fn post( - State(state): State>, - Json(payload): Json, + Json(payload): Json, ) -> Result, Error> { info!( "edit device {} ({}, {}, {})", @@ -146,20 +88,16 @@ pub async fn post( ); let ip = IpNetwork::from_str(&payload.ip)?; let mac = MacAddress::from_str(&payload.mac)?; - let device = sqlx::query_as!( - Device, - r#" - UPDATE devices - SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4 - RETURNING id, mac, broadcast_addr, ip, times; - "#, + let times = Device::read(&payload.id)?.times; + + let device = Device { + id: payload.id, mac, - payload.broadcast_addr, + broadcast_addr: payload.broadcast_addr, ip, - payload.id - ) - .fetch_one(&state.db) - .await?; + times, + }; + device.write()?; Ok(Json(json!(device))) } diff --git a/src/routes/start.rs b/src/routes/start.rs index ff3d1be..6907193 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs @@ -1,7 +1,7 @@ -use crate::db::Device; +use crate::storage::Device; use crate::error::Error; use crate::services::ping::Value as PingValue; -use crate::wol::{create_buffer, send_packet}; +use crate::wol::send_packet; use axum::extract::{Path, State}; use axum::Json; use serde::{Deserialize, Serialize}; @@ -11,55 +11,6 @@ use tracing::{debug, info}; use utoipa::ToSchema; use uuid::Uuid; -#[utoipa::path( - post, - path = "/start", - request_body = PayloadOld, - responses( - (status = 200, description = "DEP", body = [Response]) - ), - security((), ("api_key" = [])) -)] -#[deprecated] -pub async fn start_payload( - State(state): State>, - Json(payload): Json, -) -> Result, Error> { - info!("POST request"); - let device = sqlx::query_as!( - Device, - r#" - SELECT id, mac, broadcast_addr, ip, times - FROM devices - WHERE id = $1; - "#, - payload.id - ) - .fetch_one(&state.db) - .await?; - - info!("starting {}", device.id); - - let bind_addr = "0.0.0.0:0"; - - let _ = send_packet( - bind_addr, - &device.broadcast_addr, - &create_buffer(&device.mac.to_string())?, - )?; - let dev_id = device.id.clone(); - let uuid = if payload.ping.is_some_and(|ping| ping) { - Some(setup_ping(state, device)) - } else { - None - }; - Ok(Json(json!(Response { - id: dev_id, - boot: true, - uuid - }))) -} - #[utoipa::path( post, path = "/start/{id}", @@ -77,7 +28,7 @@ pub async fn post( Path(id): Path, payload: Option>, ) -> Result, Error> { - send_wol(state, &id, payload).await + send_wol(state, &id, payload) } #[utoipa::path( @@ -95,26 +46,16 @@ pub async fn get( State(state): State>, Path(id): Path, ) -> Result, Error> { - send_wol(state, &id, None).await + send_wol(state, &id, None) } -async fn send_wol( +fn send_wol( state: Arc, id: &str, payload: Option>, ) -> Result, Error> { - info!("Start request for {id}"); - let device = sqlx::query_as!( - Device, - r#" - SELECT id, mac, broadcast_addr, ip, times - FROM devices - WHERE id = $1; - "#, - id - ) - .fetch_one(&state.db) - .await?; + info!("start request for {id}"); + let device = Device::read(id)?; info!("starting {}", device.id); @@ -122,8 +63,8 @@ async fn send_wol( let _ = send_packet( bind_addr, - &device.broadcast_addr, - &create_buffer(&device.mac.to_string())?, + &device.broadcast_addr.to_string(), + &device.mac.bytes() )?; let dev_id = device.id.clone(); let uuid = if let Some(pl) = payload { @@ -163,6 +104,7 @@ fn setup_ping(state: Arc, device: Device) -> String { uuid_gen.clone(), PingValue { ip: device.ip, + eta: get_eta(device.clone().times), online: false, }, ); @@ -174,7 +116,6 @@ fn setup_ping(state: Arc, device: Device) -> String { device, uuid_gen, &state.ping_map, - &state.db, ) .await; }); @@ -182,11 +123,14 @@ fn setup_ping(state: Arc, device: Device) -> String { uuid_ret } -#[derive(Deserialize, ToSchema)] -#[deprecated] -pub struct PayloadOld { - id: String, - ping: Option, +fn get_eta(times: Option>) -> i64 { + let times = if let Some(times) = times { + times + } else { + vec![0] + }; + + times.iter().sum::() / i64::try_from(times.len()).unwrap() } #[derive(Deserialize, ToSchema)] diff --git a/src/routes/status.rs b/src/routes/status.rs index 0e25f7d..b38202b 100644 --- a/src/routes/status.rs +++ b/src/routes/status.rs @@ -3,7 +3,6 @@ use crate::AppState; use axum::extract::ws::{Message, WebSocket}; use axum::extract::{State, WebSocketUpgrade}; use axum::response::Response; -use sqlx::PgPool; use std::sync::Arc; use tracing::{debug, trace}; @@ -18,13 +17,13 @@ pub async fn websocket(mut socket: WebSocket, state: Arc) { trace!("Search for uuid: {}", uuid); - let eta = get_eta(&state.db).await; - let _ = socket - .send(Message::Text(format!("eta_{eta}_{uuid}"))) - .await; let device_exists = state.ping_map.contains_key(&uuid); if device_exists { + let eta = state.ping_map.get(&uuid).unwrap().eta; + let _ = socket + .send(Message::Text(format!("eta_{eta}_{uuid}"))) + .await; let _ = socket .send(receive_ping_broadcast(state.clone(), uuid).await) .await; @@ -62,18 +61,3 @@ async fn receive_ping_broadcast(state: Arc, uuid: String) -> Message { } } } - -async fn get_eta(db: &PgPool) -> i64 { - let query = sqlx::query!(r#"SELECT times FROM devices;"#) - .fetch_one(db) - .await - .unwrap(); - - let times = if let Some(times) = query.times { - times - } else { - vec![0] - }; - - times.iter().sum::() / i64::try_from(times.len()).unwrap() -} diff --git a/src/services/ping.rs b/src/services/ping.rs index 8cf6072..1bf022d 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -1,8 +1,7 @@ use crate::config::Config; -use crate::db::Device; +use crate::storage::Device; use dashmap::DashMap; use ipnetwork::IpNetwork; -use sqlx::PgPool; use std::fmt::Display; use time::{Duration, Instant}; use tokio::sync::broadcast::Sender; @@ -13,6 +12,7 @@ pub type StatusMap = DashMap; #[derive(Debug, Clone)] pub struct Value { pub ip: IpNetwork, + pub eta: i64, pub online: bool, } @@ -22,7 +22,6 @@ pub async fn spawn( device: Device, uuid: String, ping_map: &StatusMap, - db: &PgPool, ) { let timer = Instant::now(); let payload = [0; 8]; @@ -56,27 +55,29 @@ pub async fn spawn( let _ = tx.send(msg.clone()); if msg.command == BroadcastCommands::Success { if timer.elapsed().whole_seconds() > config.pingthreshold { - sqlx::query!( - r#" - UPDATE devices - SET times = array_append(times, $1) - WHERE id = $2; - "#, - timer.elapsed().whole_seconds(), - device.id - ) - .execute(db) - .await - .unwrap(); + let newtimes = if let Some(mut oldtimes) = device.times { + oldtimes.push(timer.elapsed().whole_seconds()); + oldtimes + } else { + vec![timer.elapsed().whole_seconds()] + }; + + let updatedev = Device { + id: device.id, + mac: device.mac, + broadcast_addr: device.broadcast_addr, + ip: device.ip, + times: Some(newtimes), + }; + updatedev.write().unwrap(); } - ping_map.insert( - uuid.clone(), - Value { - ip: device.ip, - online: true, - }, - ); + ping_map.alter(&uuid, |_, v| Value { + ip: v.ip, + eta: v.eta, + online: true, + }); + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; } trace!("remove {} from ping_map", uuid); diff --git a/src/storage.rs b/src/storage.rs new file mode 100644 index 0000000..6ba5ee1 --- /dev/null +++ b/src/storage.rs @@ -0,0 +1,65 @@ +use std::{ + fs::{create_dir_all, File}, + io::{Read, Write}, + path::Path, +}; + +use ipnetwork::IpNetwork; +use mac_address::MacAddress; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use tracing::{debug, warn}; +use utoipa::ToSchema; + +use crate::error::Error; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Device { + pub id: String, + pub mac: MacAddress, + pub broadcast_addr: String, + pub ip: IpNetwork, + pub times: Option>, +} + +impl Device { + const STORAGE_PATH: &'static str = "devices"; + + pub fn setup() -> Result { + let sp = Path::new(Self::STORAGE_PATH); + if !sp.exists() { + warn!("device storage path doesn't exist, creating it"); + create_dir_all(Self::STORAGE_PATH)?; + }; + + debug!("device storage at '{}'", Self::STORAGE_PATH); + + Ok(Self::STORAGE_PATH.to_string()) + } + + pub fn read(id: &str) -> Result { + let mut file = File::open(format!("{}/{id}.json", Self::STORAGE_PATH))?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + + let dev = serde_json::from_str(&buf)?; + Ok(dev) + } + + pub fn write(&self) -> Result<(), Error> { + let mut file = File::create(format!("{}/{}.json", Self::STORAGE_PATH, self.id))?; + file.write_all(json!(self).to_string().as_bytes())?; + + Ok(()) + } +} + +#[derive(ToSchema)] +#[schema(as = Device)] +pub struct DeviceSchema { + pub id: String, + pub mac: String, + pub broadcast_addr: String, + pub ip: String, + pub times: Option>, +} diff --git a/src/wol.rs b/src/wol.rs index 31cf350..6392366 100644 --- a/src/wol.rs +++ b/src/wol.rs @@ -2,26 +2,6 @@ use std::net::{ToSocketAddrs, UdpSocket}; use crate::error::Error; -/// Creates the magic packet from a mac address -/// -/// # Panics -/// -/// Panics if `mac_addr` is an invalid mac -pub fn create_buffer(mac_addr: &str) -> Result, Error> { - let mut mac = Vec::new(); - let sp = mac_addr.split(':'); - for f in sp { - mac.push(u8::from_str_radix(f, 16)?); - } - let mut buf = vec![255; 6]; - for _ in 0..16 { - for i in &mac { - buf.push(*i); - } - } - Ok(buf) -} - /// Sends a buffer on UDP broadcast pub fn send_packet( bind_addr: A, -- cgit v1.2.3