65 lines
1.8 KiB
Python
65 lines
1.8 KiB
Python
from datetime import date
|
|
from typing import Optional
|
|
|
|
from passlib.context import CryptContext
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.user.model import User
|
|
from app.role import service as role_service
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"])
|
|
|
|
|
|
def create_user(db: Session, user: User) -> User:
|
|
existing_user = get_user_by_username_or_email(db, user.username, user.email)
|
|
if existing_user is not None:
|
|
raise Exception()
|
|
|
|
user.password = pwd_context.hash(user.password)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
|
|
def get_user_by_id(db: Session, id: int) -> Optional[User]:
|
|
return db.query(User).filter(User.id == id).one_or_none()
|
|
|
|
|
|
def get_user_by_username(db: Session, username: str) -> Optional[User]:
|
|
return db.query(User).filter(User.username == username).one_or_none()
|
|
|
|
|
|
def get_user_by_username_or_email(db: Session, username: str, email: str) -> Optional[User]:
|
|
return db.query(User).filter(User.username == username, User.email == email).one_or_none()
|
|
|
|
|
|
def create_root_user(db: Session, root_email: str, root_password: str):
|
|
root_user = get_user_by_username(db, "root")
|
|
admin_role = role_service.get_role_by_name(db, "admin")
|
|
|
|
if root_user is None:
|
|
root_user = User(
|
|
username="root",
|
|
email=root_email,
|
|
password=root_password,
|
|
given_name="Root",
|
|
role=admin_role,
|
|
birthdate=date.today()
|
|
)
|
|
create_user(db, root_user)
|
|
|
|
|
|
def delete_user_by_id(db: Session, id: int):
|
|
user = get_user_by_id(db, id)
|
|
|
|
if user.username == "root":
|
|
raise Exception()
|
|
|
|
if user is not None:
|
|
db.delete(user)
|
|
db.commit()
|
|
|
|
|
|
def passwords_match(hashed: str, raw: str) -> bool:
|
|
return pwd_context.verify(raw, hashed)
|