mercury/web/api/streaming/module.go

127 lines
2.9 KiB
Go

package streaming
import (
"context"
"encoding/json"
"errors"
"net/http"
"git.sleepycat.moe/sam/mercury/internal/streaming"
"git.sleepycat.moe/sam/mercury/web/api"
"git.sleepycat.moe/sam/mercury/web/app"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
)
type App struct {
*app.App
}
func New(app *app.App) *App {
return &App{
App: app,
}
}
var upgrader = websocket.Upgrader{}
func (app *App) Streaming(w http.ResponseWriter, r *http.Request) error {
token, _ := app.TokenFromContext(r.Context())
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Err(err).Msg("performing websocket handshake")
hse := websocket.HandshakeError{}
if errors.As(err, &hse) {
return nil // the upgrader already sent a response for us
}
return err
}
ctx, cancel := context.WithCancel(context.Background())
socket, ok := app.Processor.SocketHolder.SocketsFor(token.UserID).NewSocket(ctx, cancel)
if !ok {
err := conn.WriteJSON(streaming.Event{
Type: streaming.EventTypeError,
Data: streaming.ErrorEvent{Code: api.ErrTooManyStreams, Message: "Too many streams open"},
})
if err != nil {
log.Err(err).Msg("writing stream rejection message to socket")
}
return nil
}
go app.writeStream(conn, socket)
go app.readStream(conn, socket)
return nil
}
func (app *App) writeStream(conn *websocket.Conn, socket *streaming.Socket) {
defer conn.Close()
for {
select {
case <-socket.Done():
return
case ev := <-socket.Chan():
// at this point, the type should already have been filtered, so just send the event
err := conn.WriteJSON(ev)
if err != nil {
// write failed, bail and make client reconnect
log.Err(err).Msg("error writing JSON to socket")
socket.Cancel()
}
}
}
}
func (app *App) readStream(conn *websocket.Conn, socket *streaming.Socket) {
for {
select {
case <-socket.Done():
return
default:
var e streaming.IncomingEvent
err := conn.ReadJSON(&e)
if err != nil {
// read failed, bail and make client reconnect
log.Err(err).Msg("error reading JSON from socket")
socket.Cancel()
return
}
switch e.Type {
case streaming.EventTypeSubscribe, streaming.EventTypeUnsubscribe:
var et streaming.EventType
err = json.Unmarshal(e.Data, &et)
if err != nil {
// invalid event type, log but don't disconnect
log.Err(err).Msg("reading event type to subscribe to")
continue
}
if !et.ValidReceive() {
log.Debug().Int("event", int(et)).Msg("invalid event for subscription")
// if it's not a valid event, ignore silently
continue
}
socket.SetEvent(et, e.Type == streaming.EventTypeSubscribe)
log.Debug().
Int("event", int(et)).
Bool("subscribed", e.Type == streaming.EventTypeSubscribe).
Msg("toggled subscription status for event")
}
if socket.WillAcceptEvent(streaming.EventTypeEcho) {
conn.WriteJSON(streaming.Event{
Type: streaming.EventTypeEcho,
Data: e,
})
}
}
}
}