127 lines
2.9 KiB
Go
127 lines
2.9 KiB
Go
package streaming
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"net/http"
|
|
|
|
"git.sleepycat.moe/sam/mercury/internal/streaming"
|
|
"git.sleepycat.moe/sam/mercury/web/api"
|
|
"git.sleepycat.moe/sam/mercury/web/app"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type App struct {
|
|
*app.App
|
|
}
|
|
|
|
func New(app *app.App) *App {
|
|
return &App{
|
|
App: app,
|
|
}
|
|
}
|
|
|
|
var upgrader = websocket.Upgrader{}
|
|
|
|
func (app *App) Streaming(w http.ResponseWriter, r *http.Request) error {
|
|
token, _ := app.TokenFromContext(r.Context())
|
|
conn, err := upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Err(err).Msg("performing websocket handshake")
|
|
hse := websocket.HandshakeError{}
|
|
if errors.As(err, &hse) {
|
|
return nil // the upgrader already sent a response for us
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
socket, ok := app.Processor.SocketHolder.SocketsFor(token.UserID).NewSocket(ctx, cancel)
|
|
if !ok {
|
|
err := conn.WriteJSON(streaming.Event{
|
|
Type: streaming.EventTypeError,
|
|
Data: streaming.ErrorEvent{Code: api.ErrTooManyStreams, Message: "Too many streams open"},
|
|
})
|
|
if err != nil {
|
|
log.Err(err).Msg("writing stream rejection message to socket")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
go app.writeStream(conn, socket)
|
|
go app.readStream(conn, socket)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (app *App) writeStream(conn *websocket.Conn, socket *streaming.Socket) {
|
|
defer conn.Close()
|
|
|
|
for {
|
|
select {
|
|
case <-socket.Done():
|
|
return
|
|
case ev := <-socket.Chan():
|
|
// at this point, the type should already have been filtered, so just send the event
|
|
err := conn.WriteJSON(ev)
|
|
if err != nil {
|
|
// write failed, bail and make client reconnect
|
|
log.Err(err).Msg("error writing JSON to socket")
|
|
socket.Cancel()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (app *App) readStream(conn *websocket.Conn, socket *streaming.Socket) {
|
|
for {
|
|
select {
|
|
case <-socket.Done():
|
|
return
|
|
default:
|
|
var e streaming.IncomingEvent
|
|
err := conn.ReadJSON(&e)
|
|
if err != nil {
|
|
// read failed, bail and make client reconnect
|
|
log.Err(err).Msg("error reading JSON from socket")
|
|
socket.Cancel()
|
|
return
|
|
}
|
|
|
|
switch e.Type {
|
|
case streaming.EventTypeSubscribe, streaming.EventTypeUnsubscribe:
|
|
var et streaming.EventType
|
|
err = json.Unmarshal(e.Data, &et)
|
|
if err != nil {
|
|
// invalid event type, log but don't disconnect
|
|
log.Err(err).Msg("reading event type to subscribe to")
|
|
continue
|
|
}
|
|
|
|
if !et.ValidReceive() {
|
|
log.Debug().Int("event", int(et)).Msg("invalid event for subscription")
|
|
|
|
// if it's not a valid event, ignore silently
|
|
continue
|
|
}
|
|
|
|
socket.SetEvent(et, e.Type == streaming.EventTypeSubscribe)
|
|
log.Debug().
|
|
Int("event", int(et)).
|
|
Bool("subscribed", e.Type == streaming.EventTypeSubscribe).
|
|
Msg("toggled subscription status for event")
|
|
}
|
|
|
|
if socket.WillAcceptEvent(streaming.EventTypeEcho) {
|
|
conn.WriteJSON(streaming.Event{
|
|
Type: streaming.EventTypeEcho,
|
|
Data: e,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|