from datetime import date from typing import Optional from passlib.context import CryptContext from sqlalchemy.orm import Session from app.user.model import User from app.role import service as role_service pwd_context = CryptContext(schemes=["bcrypt"]) def create_user(db: Session, user: User) -> User: existing_user = get_user_by_username_or_email(db, user.username, user.email) if existing_user is not None: raise Exception() user.password = pwd_context.hash(user.password) db.add(user) db.commit() db.refresh(user) return user def get_user_by_id(db: Session, id: int) -> Optional[User]: return db.query(User).filter(User.id == id).one_or_none() def get_user_by_username(db: Session, username: str) -> Optional[User]: return db.query(User).filter(User.username == username).one_or_none() def get_user_by_username_or_email(db: Session, username: str, email: str) -> Optional[User]: return db.query(User).filter(User.username == username, User.email == email).one_or_none() def create_root_user(db: Session, root_email: str, root_password: str): root_user = get_user_by_username(db, "root") admin_role = role_service.get_role_by_name(db, "admin") if root_user is None: root_user = User( username="root", email=root_email, password=root_password, given_name="Root", role=admin_role, birthdate=date.today() ) create_user(db, root_user) def delete_user_by_id(db: Session, id: int): user = get_user_by_id(db, id) if user.username == "root": raise Exception() if user is not None: db.delete(user) db.commit() def passwords_match(hashed: str, raw: str) -> bool: return pwd_context.verify(raw, hashed)