Compare commits

...

1 commit
main ... wip-ws

Author SHA1 Message Date
sam
b5305ddd5b more websocket stuff 2024-03-11 22:16:52 +01:00
6 changed files with 130 additions and 1 deletions

14
Cargo.lock generated
View file

@ -582,6 +582,19 @@ dependencies = [
"typenum",
]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "data-encoding"
version = "2.5.0"
@ -1170,6 +1183,7 @@ dependencies = [
"chrono",
"clap",
"color-eyre",
"dashmap",
"eyre",
"foxchat",
"futures",

View file

@ -29,3 +29,4 @@ reqwest = { version = "0.11.23", features = ["json", "gzip", "brotli", "multipar
chrono = "0.4.31"
futures = "0.3.30"
tokio-tungstenite = { version = "0.21.0", features = ["rustls"] }
dashmap = "5.5.3"

View file

@ -1,5 +1,7 @@
use foxchat::s2s::Dispatch;
use rsa::{RsaPrivateKey, RsaPublicKey};
use sqlx::{Pool, Postgres};
use tokio::sync::broadcast::Sender;
use crate::config::Config;
@ -8,4 +10,5 @@ pub struct AppState {
pub config: Config,
pub private_key: RsaPrivateKey,
pub public_key: RsaPublicKey,
pub broadcast: Sender<Dispatch>,
}

View file

@ -1,4 +1,5 @@
mod proxy_header;
mod s2s_socket;
pub use proxy_header::ProxyServerHeader;

View file

@ -0,0 +1,103 @@
use std::sync::Arc;
use eyre::Result;
use foxchat::{
s2s::{Dispatch, Payload},
signature,
};
use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use reqwest::Url;
use tokio::{
net::TcpStream,
sync::{broadcast, mpsc, RwLock},
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use crate::{app_state::AppState, model::chat_instance::ChatInstance};
pub enum ServerSocketMessage {
Connect,
Disconnect,
}
pub struct SocketState {
id: String,
domain: String,
conn_count: u32,
broadcast: broadcast::Sender<Dispatch>,
}
impl SocketState {
pub async fn new(
state: Arc<AppState>,
instance: &ChatInstance,
) -> Result<mpsc::Sender<ServerSocketMessage>> {
let (msg_tx, msg_rx) = mpsc::channel::<ServerSocketMessage>(10);
let socket_state = Arc::new(RwLock::new(SocketState {
id: instance.id.clone(),
domain: instance.domain.clone(),
conn_count: 0,
broadcast: state.broadcast.clone(),
}));
let url = Url::parse(&format!("{}/_fox/chat/ws", instance.base_url))?;
let (stream, _) = connect_async(url).await?;
let (tx, rx) = stream.split();
tokio::spawn(read(socket_state.clone(), rx, msg_tx.clone()));
tokio::spawn(write(state.clone(), socket_state.clone(), tx, msg_rx));
Ok(msg_tx)
}
}
async fn write(
app_state: Arc<AppState>,
state: Arc<RwLock<SocketState>>,
mut tx: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
msg_rx: mpsc::Receiver<ServerSocketMessage>,
) {
// Create signature and turn it into JSON
let host = state.read().await.domain.clone();
let (sig, date) =
signature::build_signature(&app_state.private_key, &host, "/_fox/chat/ws", None, None);
let msg = match to_json(Payload::Identify {
host: state.read().await.domain.clone(),
date: signature::format_date(date),
server: app_state.config.domain.clone(),
signature: sig,
}) {
Ok(t) => t,
Err(e) => {
tracing::error!("Error serializing IDENTIFY payload: {}", e);
tx.close().await.ok();
return;
}
};
// Send IDENTIFY message
match tx.send(msg).await {
Ok(_) => {}
Err(e) => {
tracing::error!("Error sending IDENTIFY payload: {}", e);
tx.close().await.ok();
return;
}
}
}
async fn read(
state: Arc<RwLock<SocketState>>,
rx: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
msg_tx: mpsc::Sender<ServerSocketMessage>,
) {
}
fn to_json(p: Payload) -> Result<Message> {
Ok(Message::Text(serde_json::to_string(&p)?))
}

View file

@ -7,18 +7,25 @@ mod ws;
use std::sync::Arc;
use axum::{routing::{get, post}, Extension, Router};
use axum::{
routing::{get, post},
Extension, Router,
};
use foxchat::s2s::Dispatch;
use sqlx::{Pool, Postgres};
use tower_http::trace::TraceLayer;
use crate::{app_state::AppState, config::Config, model::instance::Instance};
pub fn new(pool: Pool<Postgres>, config: Config, instance: Instance) -> Router {
let (broadcast, _) = tokio::sync::broadcast::channel::<Dispatch>(10);
let app_state = Arc::new(AppState {
pool,
config,
public_key: instance.public_key,
private_key: instance.private_key,
broadcast,
});
let app = Router::new()