foxfiles/foxfiles/blueprints/api/user.py

89 lines
2.4 KiB
Python

# SPDX-License-Identifier: Apache-2.0
from flask import Blueprint, g
from flask_pydantic import validate
from pydantic import BaseModel
from sqlalchemy import select, func
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from foxfiles.errors import Errors
from foxfiles.db import User, engine
from foxfiles.user import maybe_token
bp = Blueprint("user_api", __name__)
class CreateUserRequest(BaseModel):
username: str
password: str
is_admin: bool = False
class UserResponse(BaseModel):
id: int
username: str
is_admin: bool
token: str
@bp.post("/api/users")
@maybe_token
@validate()
def create_user(body: CreateUserRequest):
with Session(engine) as session:
user_count = session.scalar(select(func.count(User.id)))
if user_count == 0:
# if there's no users, create the user (as admin) and return
user = User(body.username, body.password, True)
session.add(user)
session.commit()
return UserResponse(
id=user.id,
username=user.username,
is_admin=user.is_admin,
token=user.get_token(),
)
# else, check if the user is an admin
if "user" not in g:
return Errors.UNAUTHORIZED
if not g.user.is_admin:
return Errors.FORBIDDEN
try:
user = User(body.username, body.password, body.is_admin)
session.add(user)
session.commit()
except IntegrityError as e:
return {"error": e._message()}, 400
return UserResponse(
id=user.id,
username=user.username,
is_admin=user.is_admin,
token=user.get_token(),
)
class LoginRequest(BaseModel):
username: str
password: str
@bp.post("/api/users/login")
@validate()
def login(body: LoginRequest):
with Session(engine) as session:
user = session.scalar(select(User).where(User.username == body.username))
if user is None:
return Errors.INVALID_CREDENTIALS
if user.verify_password(body.password):
return UserResponse(
id=user.id,
username=user.username,
is_admin=user.is_admin,
token=user.get_token(),
)
else:
return Errors.INVALID_CREDENTIALS