more websocket stuff
This commit is contained in:
parent
fd77dd01fa
commit
b5305ddd5b
6 changed files with 130 additions and 1 deletions
14
Cargo.lock
generated
14
Cargo.lock
generated
|
@ -582,6 +582,19 @@ dependencies = [
|
|||
"typenum",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dashmap"
|
||||
version = "5.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"hashbrown",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "data-encoding"
|
||||
version = "2.5.0"
|
||||
|
@ -1170,6 +1183,7 @@ dependencies = [
|
|||
"chrono",
|
||||
"clap",
|
||||
"color-eyre",
|
||||
"dashmap",
|
||||
"eyre",
|
||||
"foxchat",
|
||||
"futures",
|
||||
|
|
|
@ -29,3 +29,4 @@ reqwest = { version = "0.11.23", features = ["json", "gzip", "brotli", "multipar
|
|||
chrono = "0.4.31"
|
||||
futures = "0.3.30"
|
||||
tokio-tungstenite = { version = "0.21.0", features = ["rustls"] }
|
||||
dashmap = "5.5.3"
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
use foxchat::s2s::Dispatch;
|
||||
use rsa::{RsaPrivateKey, RsaPublicKey};
|
||||
use sqlx::{Pool, Postgres};
|
||||
use tokio::sync::broadcast::Sender;
|
||||
|
||||
use crate::config::Config;
|
||||
|
||||
|
@ -8,4 +10,5 @@ pub struct AppState {
|
|||
pub config: Config,
|
||||
pub private_key: RsaPrivateKey,
|
||||
pub public_key: RsaPublicKey,
|
||||
pub broadcast: Sender<Dispatch>,
|
||||
}
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
mod proxy_header;
|
||||
mod s2s_socket;
|
||||
|
||||
pub use proxy_header::ProxyServerHeader;
|
||||
|
||||
|
|
103
identity/src/fed/s2s_socket.rs
Normal file
103
identity/src/fed/s2s_socket.rs
Normal file
|
@ -0,0 +1,103 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use eyre::Result;
|
||||
use foxchat::{
|
||||
s2s::{Dispatch, Payload},
|
||||
signature,
|
||||
};
|
||||
use futures::{
|
||||
stream::{SplitSink, SplitStream},
|
||||
SinkExt, StreamExt,
|
||||
};
|
||||
use reqwest::Url;
|
||||
use tokio::{
|
||||
net::TcpStream,
|
||||
sync::{broadcast, mpsc, RwLock},
|
||||
};
|
||||
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
|
||||
|
||||
use crate::{app_state::AppState, model::chat_instance::ChatInstance};
|
||||
|
||||
pub enum ServerSocketMessage {
|
||||
Connect,
|
||||
Disconnect,
|
||||
}
|
||||
|
||||
pub struct SocketState {
|
||||
id: String,
|
||||
domain: String,
|
||||
conn_count: u32,
|
||||
broadcast: broadcast::Sender<Dispatch>,
|
||||
}
|
||||
|
||||
impl SocketState {
|
||||
pub async fn new(
|
||||
state: Arc<AppState>,
|
||||
instance: &ChatInstance,
|
||||
) -> Result<mpsc::Sender<ServerSocketMessage>> {
|
||||
let (msg_tx, msg_rx) = mpsc::channel::<ServerSocketMessage>(10);
|
||||
|
||||
let socket_state = Arc::new(RwLock::new(SocketState {
|
||||
id: instance.id.clone(),
|
||||
domain: instance.domain.clone(),
|
||||
conn_count: 0,
|
||||
broadcast: state.broadcast.clone(),
|
||||
}));
|
||||
|
||||
let url = Url::parse(&format!("{}/_fox/chat/ws", instance.base_url))?;
|
||||
let (stream, _) = connect_async(url).await?;
|
||||
|
||||
let (tx, rx) = stream.split();
|
||||
|
||||
tokio::spawn(read(socket_state.clone(), rx, msg_tx.clone()));
|
||||
tokio::spawn(write(state.clone(), socket_state.clone(), tx, msg_rx));
|
||||
|
||||
Ok(msg_tx)
|
||||
}
|
||||
}
|
||||
|
||||
async fn write(
|
||||
app_state: Arc<AppState>,
|
||||
state: Arc<RwLock<SocketState>>,
|
||||
mut tx: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
|
||||
msg_rx: mpsc::Receiver<ServerSocketMessage>,
|
||||
) {
|
||||
// Create signature and turn it into JSON
|
||||
let host = state.read().await.domain.clone();
|
||||
let (sig, date) =
|
||||
signature::build_signature(&app_state.private_key, &host, "/_fox/chat/ws", None, None);
|
||||
let msg = match to_json(Payload::Identify {
|
||||
host: state.read().await.domain.clone(),
|
||||
date: signature::format_date(date),
|
||||
server: app_state.config.domain.clone(),
|
||||
signature: sig,
|
||||
}) {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
tracing::error!("Error serializing IDENTIFY payload: {}", e);
|
||||
tx.close().await.ok();
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Send IDENTIFY message
|
||||
match tx.send(msg).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
tracing::error!("Error sending IDENTIFY payload: {}", e);
|
||||
tx.close().await.ok();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn read(
|
||||
state: Arc<RwLock<SocketState>>,
|
||||
rx: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
|
||||
msg_tx: mpsc::Sender<ServerSocketMessage>,
|
||||
) {
|
||||
}
|
||||
|
||||
fn to_json(p: Payload) -> Result<Message> {
|
||||
Ok(Message::Text(serde_json::to_string(&p)?))
|
||||
}
|
|
@ -7,18 +7,25 @@ mod ws;
|
|||
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{routing::{get, post}, Extension, Router};
|
||||
use axum::{
|
||||
routing::{get, post},
|
||||
Extension, Router,
|
||||
};
|
||||
use foxchat::s2s::Dispatch;
|
||||
use sqlx::{Pool, Postgres};
|
||||
use tower_http::trace::TraceLayer;
|
||||
|
||||
use crate::{app_state::AppState, config::Config, model::instance::Instance};
|
||||
|
||||
pub fn new(pool: Pool<Postgres>, config: Config, instance: Instance) -> Router {
|
||||
let (broadcast, _) = tokio::sync::broadcast::channel::<Dispatch>(10);
|
||||
|
||||
let app_state = Arc::new(AppState {
|
||||
pool,
|
||||
config,
|
||||
public_key: instance.public_key,
|
||||
private_key: instance.private_key,
|
||||
broadcast,
|
||||
});
|
||||
|
||||
let app = Router::new()
|
||||
|
|
Loading…
Reference in a new issue