86 lines
2 KiB
Go
86 lines
2 KiB
Go
package processor
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"git.sleepycat.moe/sam/mercury/internal/database"
|
|
"git.sleepycat.moe/sam/mercury/internal/database/sql"
|
|
"git.sleepycat.moe/sam/mercury/internal/streaming"
|
|
"git.sleepycat.moe/sam/mercury/web/api"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type Processor struct {
|
|
SocketHolder *streaming.SocketHolder
|
|
|
|
db *sql.Base
|
|
}
|
|
|
|
func New(db *sql.Base) *Processor {
|
|
p := &Processor{
|
|
SocketHolder: streaming.NewSocketHolder(),
|
|
|
|
db: db,
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (p *Processor) HandlePost(post database.Post) {
|
|
// this function is spun off in a separate goroutine, so we shouldn't use the same context as the parent request
|
|
// TODO: make timeout configurable?
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
|
defer cancel()
|
|
|
|
conn, err := p.db.Acquire(ctx)
|
|
if err != nil {
|
|
log.Err(err).Msg("acquiring database")
|
|
return
|
|
}
|
|
defer conn.Release()
|
|
|
|
// get the blog and their followers
|
|
blog, err := sql.NewBlogStore(conn).ByID(ctx, post.BlogID)
|
|
if err != nil {
|
|
log.Err(err).Msg("getting blog")
|
|
return
|
|
}
|
|
acct, err := sql.NewAccountStore(conn).ByID(ctx, blog.AccountID)
|
|
if err != nil {
|
|
log.Err(err).Msg("getting account")
|
|
return
|
|
}
|
|
followers, err := sql.NewBlogStore(conn).Followers(ctx, blog.ID)
|
|
if err != nil {
|
|
log.Err(err).Msg("getting followers")
|
|
return
|
|
}
|
|
|
|
p.handlePostLocal(ctx, post, blog, acct, followers)
|
|
}
|
|
|
|
// handlePostLocal handles a post on the local side--mostly sending notifications to websockets.
|
|
// All posts go through this function.
|
|
func (p *Processor) handlePostLocal(
|
|
ctx context.Context, post database.Post,
|
|
blog database.Blog, acct database.Account,
|
|
followers []sql.BlogFollower,
|
|
) {
|
|
// send to self
|
|
apiPost := api.DBPostToPost(post, blog, acct)
|
|
p.SocketHolder.Send(acct.ID, streaming.EventTypePost, apiPost)
|
|
|
|
// send to followers
|
|
if post.Visibility != database.DirectVisibility {
|
|
for _, follower := range followers {
|
|
if !follower.IsLocal {
|
|
continue
|
|
}
|
|
|
|
p.SocketHolder.Send(follower.AccountID, streaming.EventTypePost, apiPost)
|
|
}
|
|
}
|
|
|
|
// TODO: send to mentions
|
|
}
|