add internal/{processor,streaming}

This commit is contained in:
sam 2023-10-15 17:08:55 +02:00
parent 6f17b59a47
commit 5c6da51234
Signed by: sam
GPG key ID: B4EF20DDE721CAA1
6 changed files with 128 additions and 29 deletions

View file

@ -0,0 +1,27 @@
package processor
import (
"git.sleepycat.moe/sam/mercury/internal/database"
"git.sleepycat.moe/sam/mercury/internal/database/sql"
"git.sleepycat.moe/sam/mercury/internal/streaming"
)
type Processor struct {
SocketHolder *streaming.SocketHolder
db *sql.Base
}
func New(db *sql.Base) *Processor {
p := &Processor{
SocketHolder: &streaming.SocketHolder{},
db: db,
}
return p
}
func (p *Processor) HandlePost(post database.Post) {
}

View file

@ -0,0 +1,57 @@
package streaming
import "encoding/json"
type EventType int8
const (
EventTypeError EventType = 1
EventTypePost EventType = 2
EventTypeSubscribe EventType = 126
EventTypeUnsubscribe EventType = 127
)
func (et EventType) Valid() bool {
switch et {
case EventTypeError:
return true
case EventTypePost:
return true
case EventTypeSubscribe:
return true
case EventTypeUnsubscribe:
return true
default:
return false
}
}
// Returns true if this event can be subscribed to/unsubscribed from
func (et EventType) ValidReceive() bool {
if !et.Valid() {
return false
}
switch et {
case EventTypeError, EventTypeSubscribe, EventTypeUnsubscribe:
return false
default:
return true
}
}
type Event struct {
Type EventType `json:"t"`
Data any `json:"d"`
}
type ErrorEvent struct {
Code int `json:"code"`
Message string `json:"message"`
}
type IncomingEvent struct {
Type EventType `json:"t"`
Data json.RawMessage `json:"d"` // this is a RawMessage so we can easily unmarshal it later
}

View file

@ -7,33 +7,31 @@ import (
"github.com/oklog/ulid/v2" "github.com/oklog/ulid/v2"
) )
var SocketHolder socketHolder type SocketHolder struct {
type socketHolder struct {
// map of sockets to // map of sockets to
sockets map[ulid.ULID]*userSockets sockets map[ulid.ULID]*userSockets
mu sync.Mutex mu sync.Mutex
} }
func (sh *socketHolder) Send(acctID ulid.ULID, et EventType, data any) { func (sh *SocketHolder) Send(acctID ulid.ULID, et EventType, data any) {
userSockets := sh.socketsFor(acctID) userSockets := sh.SocketsFor(acctID)
userSockets.mu.Lock() userSockets.mu.Lock()
sockets := make([]*socket, len(userSockets.sockets)) sockets := make([]*Socket, len(userSockets.sockets))
copy(sockets, userSockets.sockets) copy(sockets, userSockets.sockets)
userSockets.mu.Unlock() userSockets.mu.Unlock()
for _, s := range sockets { for _, s := range sockets {
if s.willAcceptEvent(et) { if s.willAcceptEvent(et) {
// the socket might block for a bit, so spin this off into a separate goroutine // the socket might block for a bit, so spin this off into a separate goroutine
go func(s *socket) { go func(s *Socket) {
s.ch <- Event{Type: et, Data: data} s.ch <- Event{Type: et, Data: data}
}(s) }(s)
} }
} }
} }
func (s *socketHolder) socketsFor(acct ulid.ULID) *userSockets { func (s *SocketHolder) SocketsFor(acct ulid.ULID) *userSockets {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -49,20 +47,21 @@ const sessionCountLimit = 50 // no more than 50 concurrent sessions per user
type userSockets struct { type userSockets struct {
mu sync.Mutex mu sync.Mutex
sockets []*socket sockets []*Socket
} }
func (s *userSockets) newSocket(ctx context.Context, cancel context.CancelFunc) (*socket, bool) { func (s *userSockets) NewSocket(ctx context.Context, cancel context.CancelFunc) (*Socket, bool) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock()
if len(s.sockets) >= sessionCountLimit { if len(s.sockets) >= sessionCountLimit {
return nil, false return nil, false
} }
socket := newSocket(ctx, cancel) socket := NewSocket(ctx, cancel)
s.sockets = append(s.sockets, socket) s.sockets = append(s.sockets, socket)
return socket, true return socket, true
} }
type socket struct { type Socket struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@ -72,7 +71,7 @@ type socket struct {
mu sync.RWMutex mu sync.RWMutex
} }
func (s *socket) willAcceptEvent(mt EventType) bool { func (s *Socket) willAcceptEvent(mt EventType) bool {
if mt == EventTypeError { if mt == EventTypeError {
return true return true
} }
@ -83,7 +82,7 @@ func (s *socket) willAcceptEvent(mt EventType) bool {
return ok return ok
} }
func (s *socket) setEvent(mt EventType, add bool) { func (s *Socket) SetEvent(mt EventType, add bool) {
s.mu.Lock() s.mu.Lock()
if add { if add {
s.types[mt] = struct{}{} s.types[mt] = struct{}{}
@ -93,8 +92,20 @@ func (s *socket) setEvent(mt EventType, add bool) {
s.mu.Unlock() s.mu.Unlock()
} }
func newSocket(ctx context.Context, cancel context.CancelFunc) *socket { func (s *Socket) Cancel() {
return &socket{ s.cancel()
}
func (s *Socket) Done() <-chan struct{} {
return s.ctx.Done()
}
func (s *Socket) Chan() <-chan Event {
return s.ch
}
func NewSocket(ctx context.Context, cancel context.CancelFunc) *Socket {
return &Socket{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
ch: make(chan Event), ch: make(chan Event),

View file

@ -92,6 +92,6 @@ func (app *App) Create(w http.ResponseWriter, r *http.Request) (api.Post, error)
return api.Post{}, err return api.Post{}, err
} }
// TODO: federate post + push to websockets go app.Processor.HandlePost(post)
return api.DBPostToPost(post, blog, acct), nil return api.DBPostToPost(post, blog, acct), nil
} }

View file

@ -6,6 +6,7 @@ import (
"errors" "errors"
"net/http" "net/http"
"git.sleepycat.moe/sam/mercury/internal/streaming"
"git.sleepycat.moe/sam/mercury/web/api" "git.sleepycat.moe/sam/mercury/web/api"
"git.sleepycat.moe/sam/mercury/web/app" "git.sleepycat.moe/sam/mercury/web/app"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@ -38,7 +39,7 @@ func (app *App) Streaming(w http.ResponseWriter, r *http.Request) error {
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
socket, ok := SocketHolder.socketsFor(token.UserID).newSocket(ctx, cancel) socket, ok := app.Processor.SocketHolder.SocketsFor(token.UserID).NewSocket(ctx, cancel)
if !ok { if !ok {
err := conn.WriteJSON(newEvent(EventTypeError, ErrorEvent{Code: api.ErrTooManyStreams, Message: "Too many streams open"})) err := conn.WriteJSON(newEvent(EventTypeError, ErrorEvent{Code: api.ErrTooManyStreams, Message: "Too many streams open"}))
if err != nil { if err != nil {
@ -54,43 +55,43 @@ func (app *App) Streaming(w http.ResponseWriter, r *http.Request) error {
return nil return nil
} }
func (app *App) writeStream(conn *websocket.Conn, socket *socket) { func (app *App) writeStream(conn *websocket.Conn, socket *streaming.Socket) {
defer conn.Close() defer conn.Close()
for { for {
select { select {
case <-socket.ctx.Done(): case <-socket.Done():
return return
case ev := <-socket.ch: case ev := <-socket.Chan():
// at this point, the type should already have been filtered, so just send the event // at this point, the type should already have been filtered, so just send the event
err := conn.WriteJSON(ev) err := conn.WriteJSON(ev)
if err != nil { if err != nil {
// write failed, bail and make client reconnect // write failed, bail and make client reconnect
log.Err(err).Msg("error writing JSON to socket") log.Err(err).Msg("error writing JSON to socket")
socket.cancel() socket.Cancel()
} }
} }
} }
} }
func (app *App) readStream(conn *websocket.Conn, socket *socket) { func (app *App) readStream(conn *websocket.Conn, socket *streaming.Socket) {
for { for {
select { select {
case <-socket.ctx.Done(): case <-socket.Done():
return return
default: default:
var e IncomingEvent var e streaming.IncomingEvent
err := conn.ReadJSON(&e) err := conn.ReadJSON(&e)
if err != nil { if err != nil {
// read failed, bail and make client reconnect // read failed, bail and make client reconnect
log.Err(err).Msg("error reading JSON from socket") log.Err(err).Msg("error reading JSON from socket")
socket.cancel() socket.Cancel()
return return
} }
switch e.Type { switch e.Type {
case EventTypeSubscribe, EventTypeUnsubscribe: case streaming.EventTypeSubscribe, streaming.EventTypeUnsubscribe:
var et EventType var et streaming.EventType
err = json.Unmarshal(e.Data, &et) err = json.Unmarshal(e.Data, &et)
if err != nil { if err != nil {
// invalid event type, log but don't disconnect // invalid event type, log but don't disconnect
@ -103,7 +104,7 @@ func (app *App) readStream(conn *websocket.Conn, socket *socket) {
continue continue
} }
socket.setEvent(et, e.Type != EventTypeSubscribe) socket.SetEvent(et, e.Type != streaming.EventTypeUnsubscribe)
} }
} }
} }

View file

@ -9,6 +9,7 @@ import (
"git.sleepycat.moe/sam/mercury/internal/concurrent" "git.sleepycat.moe/sam/mercury/internal/concurrent"
"git.sleepycat.moe/sam/mercury/internal/database" "git.sleepycat.moe/sam/mercury/internal/database"
"git.sleepycat.moe/sam/mercury/internal/database/sql" "git.sleepycat.moe/sam/mercury/internal/database/sql"
"git.sleepycat.moe/sam/mercury/internal/processor"
"git.sleepycat.moe/sam/mercury/web/templates" "git.sleepycat.moe/sam/mercury/web/templates"
"github.com/flosch/pongo2/v6" "github.com/flosch/pongo2/v6"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
@ -25,6 +26,7 @@ type App struct {
AppConfig config.Config AppConfig config.Config
DBConfig *concurrent.Value[database.Config] DBConfig *concurrent.Value[database.Config]
Database *sql.Base Database *sql.Base
Processor *processor.Processor
tmpl *pongo2.TemplateSet tmpl *pongo2.TemplateSet
tokenKey []byte tokenKey []byte
@ -35,6 +37,7 @@ func NewApp(ctx context.Context, cfg config.Config, db *sql.Base) (*App, error)
Router: chi.NewRouter(), Router: chi.NewRouter(),
AppConfig: cfg, AppConfig: cfg,
Database: db, Database: db,
Processor: processor.New(db),
} }
if cfg.Core.SecretKey == "" { if cfg.Core.SecretKey == "" {