diff --git a/Cargo.lock b/Cargo.lock index d5a7535..c623084 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -582,6 +582,19 @@ dependencies = [ "typenum", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.5.0" @@ -1170,6 +1183,7 @@ dependencies = [ "chrono", "clap", "color-eyre", + "dashmap", "eyre", "foxchat", "futures", diff --git a/identity/Cargo.toml b/identity/Cargo.toml index e9b6254..f6e86a2 100644 --- a/identity/Cargo.toml +++ b/identity/Cargo.toml @@ -29,3 +29,4 @@ reqwest = { version = "0.11.23", features = ["json", "gzip", "brotli", "multipar chrono = "0.4.31" futures = "0.3.30" tokio-tungstenite = { version = "0.21.0", features = ["rustls"] } +dashmap = "5.5.3" diff --git a/identity/src/app_state.rs b/identity/src/app_state.rs index 3da8486..3b15063 100644 --- a/identity/src/app_state.rs +++ b/identity/src/app_state.rs @@ -1,5 +1,7 @@ +use foxchat::s2s::Dispatch; use rsa::{RsaPrivateKey, RsaPublicKey}; use sqlx::{Pool, Postgres}; +use tokio::sync::broadcast::Sender; use crate::config::Config; @@ -8,4 +10,5 @@ pub struct AppState { pub config: Config, pub private_key: RsaPrivateKey, pub public_key: RsaPublicKey, + pub broadcast: Sender, } diff --git a/identity/src/fed/mod.rs b/identity/src/fed/mod.rs index 40e04f7..b0e50bb 100644 --- a/identity/src/fed/mod.rs +++ b/identity/src/fed/mod.rs @@ -1,4 +1,5 @@ mod proxy_header; +mod s2s_socket; pub use proxy_header::ProxyServerHeader; diff --git a/identity/src/fed/s2s_socket.rs b/identity/src/fed/s2s_socket.rs new file mode 100644 index 0000000..91c56d3 --- /dev/null +++ b/identity/src/fed/s2s_socket.rs @@ -0,0 +1,103 @@ +use std::sync::Arc; + +use eyre::Result; +use foxchat::{ + s2s::{Dispatch, Payload}, + signature, +}; +use futures::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; +use reqwest::Url; +use tokio::{ + net::TcpStream, + sync::{broadcast, mpsc, RwLock}, +}; +use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; + +use crate::{app_state::AppState, model::chat_instance::ChatInstance}; + +pub enum ServerSocketMessage { + Connect, + Disconnect, +} + +pub struct SocketState { + id: String, + domain: String, + conn_count: u32, + broadcast: broadcast::Sender, +} + +impl SocketState { + pub async fn new( + state: Arc, + instance: &ChatInstance, + ) -> Result> { + let (msg_tx, msg_rx) = mpsc::channel::(10); + + let socket_state = Arc::new(RwLock::new(SocketState { + id: instance.id.clone(), + domain: instance.domain.clone(), + conn_count: 0, + broadcast: state.broadcast.clone(), + })); + + let url = Url::parse(&format!("{}/_fox/chat/ws", instance.base_url))?; + let (stream, _) = connect_async(url).await?; + + let (tx, rx) = stream.split(); + + tokio::spawn(read(socket_state.clone(), rx, msg_tx.clone())); + tokio::spawn(write(state.clone(), socket_state.clone(), tx, msg_rx)); + + Ok(msg_tx) + } +} + +async fn write( + app_state: Arc, + state: Arc>, + mut tx: SplitSink>, Message>, + msg_rx: mpsc::Receiver, +) { + // Create signature and turn it into JSON + let host = state.read().await.domain.clone(); + let (sig, date) = + signature::build_signature(&app_state.private_key, &host, "/_fox/chat/ws", None, None); + let msg = match to_json(Payload::Identify { + host: state.read().await.domain.clone(), + date: signature::format_date(date), + server: app_state.config.domain.clone(), + signature: sig, + }) { + Ok(t) => t, + Err(e) => { + tracing::error!("Error serializing IDENTIFY payload: {}", e); + tx.close().await.ok(); + return; + } + }; + + // Send IDENTIFY message + match tx.send(msg).await { + Ok(_) => {} + Err(e) => { + tracing::error!("Error sending IDENTIFY payload: {}", e); + tx.close().await.ok(); + return; + } + } +} + +async fn read( + state: Arc>, + rx: SplitStream>>, + msg_tx: mpsc::Sender, +) { +} + +fn to_json(p: Payload) -> Result { + Ok(Message::Text(serde_json::to_string(&p)?)) +} diff --git a/identity/src/http/mod.rs b/identity/src/http/mod.rs index 093e337..fc73add 100644 --- a/identity/src/http/mod.rs +++ b/identity/src/http/mod.rs @@ -7,18 +7,25 @@ mod ws; use std::sync::Arc; -use axum::{routing::{get, post}, Extension, Router}; +use axum::{ + routing::{get, post}, + Extension, Router, +}; +use foxchat::s2s::Dispatch; use sqlx::{Pool, Postgres}; use tower_http::trace::TraceLayer; use crate::{app_state::AppState, config::Config, model::instance::Instance}; pub fn new(pool: Pool, config: Config, instance: Instance) -> Router { + let (broadcast, _) = tokio::sync::broadcast::channel::(10); + let app_state = Arc::new(AppState { pool, config, public_key: instance.public_key, private_key: instance.private_key, + broadcast, }); let app = Router::new()