102 lines
2.3 KiB
Go
102 lines
2.3 KiB
Go
// Package sql implements the SQL storage layer.
|
|
package sql
|
|
|
|
import (
|
|
"context"
|
|
|
|
"emperror.dev/errors"
|
|
"git.sleepycat.moe/sam/mercury/internal/database"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/oklog/ulid/v2"
|
|
)
|
|
|
|
// Base is the base database pool used by storage layers.
|
|
type Base struct {
|
|
pool *pgxpool.Pool
|
|
}
|
|
|
|
// NewBase creates a new instance of Base with the specified connection string.
|
|
func NewBase(ctx context.Context, connString string) (*Base, error) {
|
|
pool, err := pgxpool.New(ctx, connString)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "creating pool")
|
|
}
|
|
|
|
base := &Base{
|
|
pool: pool,
|
|
}
|
|
|
|
// create configuration
|
|
if err := base.initSingletons(ctx); err != nil {
|
|
return nil, errors.Wrap(err, "initializing configuration")
|
|
}
|
|
|
|
return base, nil
|
|
}
|
|
|
|
func (base *Base) initSingletons(ctx context.Context) error {
|
|
err := NewConfigStore(base.pool).initConfig(ctx)
|
|
if err != nil {
|
|
return errors.Wrap(err, "initializing configuration")
|
|
}
|
|
|
|
cfg, err := NewConfigStore(base.pool).Get(ctx)
|
|
if err != nil {
|
|
return errors.Wrap(err, "getting configuration")
|
|
}
|
|
|
|
if cfg.InternalApplication == nil {
|
|
tx, err := base.BeginTx(ctx)
|
|
if err != nil {
|
|
return errors.Wrap(err, "creating transaction")
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
|
|
app, err := NewTokenStore(tx).CreateApplication(ctx, database.InternalApplicationName)
|
|
if err != nil {
|
|
return errors.Wrap(err, "creating internal application")
|
|
}
|
|
|
|
newCfg := cfg
|
|
newCfg.InternalApplication = &app.ID
|
|
|
|
cfg, err = NewConfigStore(tx).Set(ctx, cfg, newCfg)
|
|
if err != nil {
|
|
return errors.Wrap(err, "updating configuration")
|
|
}
|
|
|
|
err = tx.Commit(ctx)
|
|
if err != nil {
|
|
return errors.Wrap(err, "committing transaction")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Acquire acquires a connection from the database pool.
|
|
// It is the caller's responsibility to call the Release method.
|
|
func (base *Base) Acquire(ctx context.Context) (ReleaseableQuerier, error) {
|
|
conn, err := base.pool.Acquire(ctx)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "acquiring connection")
|
|
}
|
|
return conn, nil
|
|
}
|
|
|
|
func (base *Base) BeginTx(ctx context.Context) (Tx, error) {
|
|
tx, err := base.pool.Begin(ctx)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "beginning transaction")
|
|
}
|
|
|
|
return tx, nil
|
|
}
|
|
|
|
func (base *Base) PoolQuerier() Querier {
|
|
return base.pool
|
|
}
|
|
|
|
func makeULID() ulid.ULID {
|
|
return ulid.Make()
|
|
}
|