diff --git a/internal/processor/processor.go b/internal/processor/processor.go new file mode 100644 index 0000000..eedf4ed --- /dev/null +++ b/internal/processor/processor.go @@ -0,0 +1,27 @@ +package processor + +import ( + "git.sleepycat.moe/sam/mercury/internal/database" + "git.sleepycat.moe/sam/mercury/internal/database/sql" + "git.sleepycat.moe/sam/mercury/internal/streaming" +) + +type Processor struct { + SocketHolder *streaming.SocketHolder + + db *sql.Base +} + +func New(db *sql.Base) *Processor { + p := &Processor{ + SocketHolder: &streaming.SocketHolder{}, + + db: db, + } + + return p +} + +func (p *Processor) HandlePost(post database.Post) { + +} diff --git a/internal/streaming/event.go b/internal/streaming/event.go new file mode 100644 index 0000000..97b73dd --- /dev/null +++ b/internal/streaming/event.go @@ -0,0 +1,57 @@ +package streaming + +import "encoding/json" + +type EventType int8 + +const ( + EventTypeError EventType = 1 + EventTypePost EventType = 2 + + EventTypeSubscribe EventType = 126 + EventTypeUnsubscribe EventType = 127 +) + +func (et EventType) Valid() bool { + switch et { + case EventTypeError: + return true + case EventTypePost: + return true + case EventTypeSubscribe: + return true + case EventTypeUnsubscribe: + return true + default: + return false + } +} + +// Returns true if this event can be subscribed to/unsubscribed from +func (et EventType) ValidReceive() bool { + if !et.Valid() { + return false + } + + switch et { + case EventTypeError, EventTypeSubscribe, EventTypeUnsubscribe: + return false + default: + return true + } +} + +type Event struct { + Type EventType `json:"t"` + Data any `json:"d"` +} + +type ErrorEvent struct { + Code int `json:"code"` + Message string `json:"message"` +} + +type IncomingEvent struct { + Type EventType `json:"t"` + Data json.RawMessage `json:"d"` // this is a RawMessage so we can easily unmarshal it later +} diff --git a/web/api/streaming/sockets.go b/internal/streaming/sockets.go similarity index 61% rename from web/api/streaming/sockets.go rename to internal/streaming/sockets.go index ef6fa41..faf6360 100644 --- a/web/api/streaming/sockets.go +++ b/internal/streaming/sockets.go @@ -7,33 +7,31 @@ import ( "github.com/oklog/ulid/v2" ) -var SocketHolder socketHolder - -type socketHolder struct { +type SocketHolder struct { // map of sockets to sockets map[ulid.ULID]*userSockets mu sync.Mutex } -func (sh *socketHolder) Send(acctID ulid.ULID, et EventType, data any) { - userSockets := sh.socketsFor(acctID) +func (sh *SocketHolder) Send(acctID ulid.ULID, et EventType, data any) { + userSockets := sh.SocketsFor(acctID) userSockets.mu.Lock() - sockets := make([]*socket, len(userSockets.sockets)) + sockets := make([]*Socket, len(userSockets.sockets)) copy(sockets, userSockets.sockets) userSockets.mu.Unlock() for _, s := range sockets { if s.willAcceptEvent(et) { // the socket might block for a bit, so spin this off into a separate goroutine - go func(s *socket) { + go func(s *Socket) { s.ch <- Event{Type: et, Data: data} }(s) } } } -func (s *socketHolder) socketsFor(acct ulid.ULID) *userSockets { +func (s *SocketHolder) SocketsFor(acct ulid.ULID) *userSockets { s.mu.Lock() defer s.mu.Unlock() @@ -49,20 +47,21 @@ const sessionCountLimit = 50 // no more than 50 concurrent sessions per user type userSockets struct { mu sync.Mutex - sockets []*socket + sockets []*Socket } -func (s *userSockets) newSocket(ctx context.Context, cancel context.CancelFunc) (*socket, bool) { +func (s *userSockets) NewSocket(ctx context.Context, cancel context.CancelFunc) (*Socket, bool) { s.mu.Lock() + defer s.mu.Unlock() if len(s.sockets) >= sessionCountLimit { return nil, false } - socket := newSocket(ctx, cancel) + socket := NewSocket(ctx, cancel) s.sockets = append(s.sockets, socket) return socket, true } -type socket struct { +type Socket struct { ctx context.Context cancel context.CancelFunc @@ -72,7 +71,7 @@ type socket struct { mu sync.RWMutex } -func (s *socket) willAcceptEvent(mt EventType) bool { +func (s *Socket) willAcceptEvent(mt EventType) bool { if mt == EventTypeError { return true } @@ -83,7 +82,7 @@ func (s *socket) willAcceptEvent(mt EventType) bool { return ok } -func (s *socket) setEvent(mt EventType, add bool) { +func (s *Socket) SetEvent(mt EventType, add bool) { s.mu.Lock() if add { s.types[mt] = struct{}{} @@ -93,8 +92,20 @@ func (s *socket) setEvent(mt EventType, add bool) { s.mu.Unlock() } -func newSocket(ctx context.Context, cancel context.CancelFunc) *socket { - return &socket{ +func (s *Socket) Cancel() { + s.cancel() +} + +func (s *Socket) Done() <-chan struct{} { + return s.ctx.Done() +} + +func (s *Socket) Chan() <-chan Event { + return s.ch +} + +func NewSocket(ctx context.Context, cancel context.CancelFunc) *Socket { + return &Socket{ ctx: ctx, cancel: cancel, ch: make(chan Event), diff --git a/web/api/posts/create_post.go b/web/api/posts/create_post.go index 2a5b13e..03eb579 100644 --- a/web/api/posts/create_post.go +++ b/web/api/posts/create_post.go @@ -92,6 +92,6 @@ func (app *App) Create(w http.ResponseWriter, r *http.Request) (api.Post, error) return api.Post{}, err } - // TODO: federate post + push to websockets + go app.Processor.HandlePost(post) return api.DBPostToPost(post, blog, acct), nil } diff --git a/web/api/streaming/module.go b/web/api/streaming/module.go index 2402598..cc0d81d 100644 --- a/web/api/streaming/module.go +++ b/web/api/streaming/module.go @@ -6,6 +6,7 @@ import ( "errors" "net/http" + "git.sleepycat.moe/sam/mercury/internal/streaming" "git.sleepycat.moe/sam/mercury/web/api" "git.sleepycat.moe/sam/mercury/web/app" "github.com/gorilla/websocket" @@ -38,7 +39,7 @@ func (app *App) Streaming(w http.ResponseWriter, r *http.Request) error { } ctx, cancel := context.WithCancel(context.Background()) - socket, ok := SocketHolder.socketsFor(token.UserID).newSocket(ctx, cancel) + socket, ok := app.Processor.SocketHolder.SocketsFor(token.UserID).NewSocket(ctx, cancel) if !ok { err := conn.WriteJSON(newEvent(EventTypeError, ErrorEvent{Code: api.ErrTooManyStreams, Message: "Too many streams open"})) if err != nil { @@ -54,43 +55,43 @@ func (app *App) Streaming(w http.ResponseWriter, r *http.Request) error { return nil } -func (app *App) writeStream(conn *websocket.Conn, socket *socket) { +func (app *App) writeStream(conn *websocket.Conn, socket *streaming.Socket) { defer conn.Close() for { select { - case <-socket.ctx.Done(): + case <-socket.Done(): return - case ev := <-socket.ch: + case ev := <-socket.Chan(): // at this point, the type should already have been filtered, so just send the event err := conn.WriteJSON(ev) if err != nil { // write failed, bail and make client reconnect log.Err(err).Msg("error writing JSON to socket") - socket.cancel() + socket.Cancel() } } } } -func (app *App) readStream(conn *websocket.Conn, socket *socket) { +func (app *App) readStream(conn *websocket.Conn, socket *streaming.Socket) { for { select { - case <-socket.ctx.Done(): + case <-socket.Done(): return default: - var e IncomingEvent + var e streaming.IncomingEvent err := conn.ReadJSON(&e) if err != nil { // read failed, bail and make client reconnect log.Err(err).Msg("error reading JSON from socket") - socket.cancel() + socket.Cancel() return } switch e.Type { - case EventTypeSubscribe, EventTypeUnsubscribe: - var et EventType + case streaming.EventTypeSubscribe, streaming.EventTypeUnsubscribe: + var et streaming.EventType err = json.Unmarshal(e.Data, &et) if err != nil { // invalid event type, log but don't disconnect @@ -103,7 +104,7 @@ func (app *App) readStream(conn *websocket.Conn, socket *socket) { continue } - socket.setEvent(et, e.Type != EventTypeSubscribe) + socket.SetEvent(et, e.Type != streaming.EventTypeUnsubscribe) } } } diff --git a/web/app/app.go b/web/app/app.go index acb3b70..74da7b4 100644 --- a/web/app/app.go +++ b/web/app/app.go @@ -9,6 +9,7 @@ import ( "git.sleepycat.moe/sam/mercury/internal/concurrent" "git.sleepycat.moe/sam/mercury/internal/database" "git.sleepycat.moe/sam/mercury/internal/database/sql" + "git.sleepycat.moe/sam/mercury/internal/processor" "git.sleepycat.moe/sam/mercury/web/templates" "github.com/flosch/pongo2/v6" "github.com/go-chi/chi/v5" @@ -25,6 +26,7 @@ type App struct { AppConfig config.Config DBConfig *concurrent.Value[database.Config] Database *sql.Base + Processor *processor.Processor tmpl *pongo2.TemplateSet tokenKey []byte @@ -35,6 +37,7 @@ func NewApp(ctx context.Context, cfg config.Config, db *sql.Base) (*App, error) Router: chi.NewRouter(), AppConfig: cfg, Database: db, + Processor: processor.New(db), } if cfg.Core.SecretKey == "" {