package streaming import ( "context" "encoding/json" "errors" "net/http" "git.sleepycat.moe/sam/mercury/internal/streaming" "git.sleepycat.moe/sam/mercury/web/api" "git.sleepycat.moe/sam/mercury/web/app" "github.com/gorilla/websocket" "github.com/rs/zerolog/log" ) type App struct { *app.App } func New(app *app.App) *App { return &App{ App: app, } } var upgrader = websocket.Upgrader{} func (app *App) Streaming(w http.ResponseWriter, r *http.Request) error { token, _ := app.TokenFromContext(r.Context()) conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Err(err).Msg("performing websocket handshake") hse := websocket.HandshakeError{} if errors.As(err, &hse) { return nil // the upgrader already sent a response for us } return err } ctx, cancel := context.WithCancel(context.Background()) socket, ok := app.Processor.SocketHolder.SocketsFor(token.UserID).NewSocket(ctx, cancel) if !ok { err := conn.WriteJSON(streaming.Event{ Type: streaming.EventTypeError, Data: streaming.ErrorEvent{Code: api.ErrTooManyStreams, Message: "Too many streams open"}, }) if err != nil { log.Err(err).Msg("writing stream rejection message to socket") } return nil } go app.writeStream(conn, socket) go app.readStream(conn, socket) return nil } func (app *App) writeStream(conn *websocket.Conn, socket *streaming.Socket) { defer conn.Close() for { select { case <-socket.Done(): return case ev := <-socket.Chan(): // at this point, the type should already have been filtered, so just send the event err := conn.WriteJSON(ev) if err != nil { // write failed, bail and make client reconnect log.Err(err).Msg("error writing JSON to socket") socket.Cancel() } } } } func (app *App) readStream(conn *websocket.Conn, socket *streaming.Socket) { for { select { case <-socket.Done(): return default: var e streaming.IncomingEvent err := conn.ReadJSON(&e) if err != nil { // read failed, bail and make client reconnect log.Err(err).Msg("error reading JSON from socket") socket.Cancel() return } switch e.Type { case streaming.EventTypeSubscribe, streaming.EventTypeUnsubscribe: var et streaming.EventType err = json.Unmarshal(e.Data, &et) if err != nil { // invalid event type, log but don't disconnect log.Err(err).Msg("reading event type to subscribe to") continue } if !et.ValidReceive() { log.Debug().Int("event", int(et)).Msg("invalid event for subscription") // if it's not a valid event, ignore silently continue } socket.SetEvent(et, e.Type == streaming.EventTypeSubscribe) log.Debug(). Int("event", int(et)). Bool("subscribed", e.Type == streaming.EventTypeSubscribe). Msg("toggled subscription status for event") } if socket.WillAcceptEvent(streaming.EventTypeEcho) { conn.WriteJSON(streaming.Event{ Type: streaming.EventTypeEcho, Data: e, }) } } } }