mercury/internal/streaming/sockets.go

123 lines
2.2 KiB
Go
Raw Normal View History

package streaming
import (
"context"
"sync"
"github.com/oklog/ulid/v2"
)
2023-10-15 17:08:55 +02:00
type SocketHolder struct {
// map of sockets to
sockets map[ulid.ULID]*userSockets
mu sync.Mutex
}
func NewSocketHolder() *SocketHolder {
return &SocketHolder{
sockets: make(map[ulid.ULID]*userSockets),
}
}
2023-10-15 17:08:55 +02:00
func (sh *SocketHolder) Send(acctID ulid.ULID, et EventType, data any) {
userSockets := sh.SocketsFor(acctID)
userSockets.mu.Lock()
2023-10-15 17:08:55 +02:00
sockets := make([]*Socket, len(userSockets.sockets))
copy(sockets, userSockets.sockets)
userSockets.mu.Unlock()
for _, s := range sockets {
if s.WillAcceptEvent(et) {
// the socket might block for a bit, so spin this off into a separate goroutine
2023-10-15 17:08:55 +02:00
go func(s *Socket) {
s.ch <- Event{Type: et, Data: data}
}(s)
}
}
}
2023-10-15 17:08:55 +02:00
func (s *SocketHolder) SocketsFor(acct ulid.ULID) *userSockets {
s.mu.Lock()
defer s.mu.Unlock()
us, ok := s.sockets[acct]
if !ok {
us = &userSockets{}
s.sockets[acct] = us
}
return us
}
const sessionCountLimit = 50 // no more than 50 concurrent sessions per user
type userSockets struct {
mu sync.Mutex
2023-10-15 17:08:55 +02:00
sockets []*Socket
}
2023-10-15 17:08:55 +02:00
func (s *userSockets) NewSocket(ctx context.Context, cancel context.CancelFunc) (*Socket, bool) {
s.mu.Lock()
2023-10-15 17:08:55 +02:00
defer s.mu.Unlock()
if len(s.sockets) >= sessionCountLimit {
return nil, false
}
2023-10-15 17:08:55 +02:00
socket := NewSocket(ctx, cancel)
s.sockets = append(s.sockets, socket)
return socket, true
}
2023-10-15 17:08:55 +02:00
type Socket struct {
ctx context.Context
cancel context.CancelFunc
ch chan Event
types map[EventType]struct{}
mu sync.RWMutex
}
func (s *Socket) WillAcceptEvent(mt EventType) bool {
if mt == EventTypeError {
return true
}
s.mu.RLock()
_, ok := s.types[mt]
s.mu.RUnlock()
return ok
}
2023-10-15 17:08:55 +02:00
func (s *Socket) SetEvent(mt EventType, add bool) {
s.mu.Lock()
if add {
s.types[mt] = struct{}{}
} else {
delete(s.types, mt)
}
s.mu.Unlock()
}
2023-10-15 17:08:55 +02:00
func (s *Socket) Cancel() {
s.cancel()
}
func (s *Socket) Done() <-chan struct{} {
return s.ctx.Done()
}
func (s *Socket) Chan() <-chan Event {
return s.ch
}
func NewSocket(ctx context.Context, cancel context.CancelFunc) *Socket {
return &Socket{
ctx: ctx,
cancel: cancel,
ch: make(chan Event),
types: map[EventType]struct{}{
EventTypeEcho: {},
},
}
}