Commit existing codebase

This commit is contained in:
Andrey Chervyakov 2021-02-25 01:39:14 +06:00
commit 49bc902bb9
24 changed files with 1208 additions and 0 deletions

0
app/user/__init__.py Normal file
View file

60
app/user/dto.py Normal file
View file

@ -0,0 +1,60 @@
import calendar
from datetime import datetime, date
from typing import Optional
from pydantic import BaseModel
from app.user.model import User, Sex
class UserCreationModel(BaseModel):
username: str
email: str
password: str
given_name: str
family_name: Optional[str]
sex: str
birthdate: Optional[int]
def to_entity(self) -> User:
birthdate = None
if self.birthdate is not None:
birthdate = date.fromtimestamp(self.birthdate)
return User(
username=self.username,
email=self.email,
password=self.password,
given_name=self.given_name,
family_name=self.family_name,
sex=Sex[self.sex],
birthdate=birthdate
)
class UserResourceModel(BaseModel):
id: int
username: str
email: str
given_name: str
family_name: Optional[str]
sex: str
birthdate: Optional[int]
@staticmethod
def from_entity(user: User):
birthdate = None
if user.birthdate is not None:
birthdate = calendar.timegm(user.birthdate.timetuple())
return UserResourceModel(
id=user.id,
username=user.username,
email=user.email,
given_name=user.given_name,
family_name=user.family_name,
sex=user.sex.name,
birthdate=birthdate
)

59
app/user/handlers.py Normal file
View file

@ -0,0 +1,59 @@
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from starlette.responses import Response
from app.auth.middleware import get_auth_user
from app.user.dto import UserCreationModel, UserResourceModel
from app.user.model import User
import app.user.service as user_service
from app.db import get_db
router = APIRouter()
@router.post("", status_code=201, response_model=UserResourceModel)
async def create_user(model: UserCreationModel, db: Session = Depends(get_db)) -> UserResourceModel:
user = model.to_entity()
created_user = user_service.create_user(db, user)
return UserResourceModel.from_entity(created_user)
@router.get("/{id}", status_code=200, response_model=UserResourceModel)
async def get_user_by_id(
id: int,
auth_user: User = Depends(get_auth_user),
db: Session = Depends(get_db)
) -> UserResourceModel:
check_access(auth_user, id)
user = user_service.get_user_by_id(db, id)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return UserResourceModel.from_entity(user)
@router.delete("/{id}", status_code=204)
async def delete_user_by_id(
id: int,
auth_user: User = Depends(get_auth_user),
db: Session = Depends(get_db)
):
check_access(auth_user, id)
user_service.delete_user_by_id(db, id)
return Response(status_code=204)
def check_access(auth_user: User, param: Any):
access_exception = HTTPException(status_code=403, detail="Forbidden")
if type(param) is int:
if not (auth_user.id == param):
raise access_exception
elif type(param) is User:
if not ((auth_user.username == param.username) and (auth_user.email == param.email)):
raise access_exception

29
app/user/model.py Normal file
View file

@ -0,0 +1,29 @@
import enum
from datetime import datetime
from sqlalchemy import Column, String, Integer, DateTime, Enum, Date
from app.db import EntityBase
class Sex(enum.Enum):
"""Human sex according to ISO/IEC 5218."""
NOT_KNOWN = 0,
MALE = 1,
FEMALE = 2,
NOT_APPLICABLE = 9
class User(EntityBase):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(length=32), unique=True, nullable=False)
email = Column(String, unique=True, nullable=False)
password = Column(String, nullable=False)
given_name = Column(String(length=32), nullable=False)
family_name = Column(String(length=32))
sex = Column(Enum(Sex), nullable=False, default=Sex.NOT_KNOWN)
birthdate = Column(Date)
creation_date = Column(DateTime, nullable=False, default=datetime.utcnow())

42
app/user/service.py Normal file
View file

@ -0,0 +1,42 @@
from typing import Optional
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from app.user.model import User
pwd_context = CryptContext(schemes=["bcrypt"])
def create_user(db: Session, user: User) -> User:
if get_user_by_username_or_email(db, user.username, user.email) is not None:
raise Exception()
user.password = pwd_context.hash(user.password)
db.add(user)
db.commit()
db.refresh(user)
return user
def get_user_by_id(db: Session, id: int) -> Optional[User]:
return db.query(User).filter(User.id == id).one_or_none()
def get_user_by_username(db: Session, username: str) -> Optional[User]:
return db.query(User).filter(User.username == username).one_or_none()
def get_user_by_username_or_email(db: Session, username: str, email: str) -> Optional[User]:
return db.query(User).filter(User.username == username, User.email == email).one_or_none()
def delete_user_by_id(db: Session, id: int):
user = get_user_by_id(db, id)
if user is not None:
db.delete(user)
db.commit()
def passwords_match(hashed: str, raw: str) -> bool:
return pwd_context.verify(raw, hashed)