44 lines
1.2 KiB
Python
44 lines
1.2 KiB
Python
import os
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from jose import jwt
|
|
from jose.constants import ALGORITHMS
|
|
from sqlalchemy.orm import Session
|
|
import secrets
|
|
|
|
from app.auth.dto import Credentials
|
|
from app.config import config
|
|
from app.user.service import get_user_by_username, passwords_match
|
|
|
|
JWT_SECRET = config["CGNO_ID_JWT_SECRET"]
|
|
JWT_ISSUER = "Energia"
|
|
|
|
|
|
def authenticate(credentials: Credentials, db: Session) -> Optional[str]:
|
|
user = get_user_by_username(db, credentials.username)
|
|
|
|
if not secrets.compare_digest(user.username, credentials.username):
|
|
return None
|
|
|
|
if not passwords_match(user.password, credentials.password):
|
|
return None
|
|
else:
|
|
token = issue_token(user.id)
|
|
return token
|
|
|
|
|
|
def issue_token(user_id: int) -> str:
|
|
now = datetime.utcnow()
|
|
claims = {
|
|
"sub": str(user_id),
|
|
"iss": JWT_ISSUER,
|
|
"iat": now,
|
|
"nbf": now,
|
|
"exp": now + timedelta(weeks=1)
|
|
}
|
|
return jwt.encode(claims, JWT_SECRET, algorithm=ALGORITHMS.HS256)
|
|
|
|
|
|
def verify_token(token: str) -> dict:
|
|
return jwt.decode(token, JWT_SECRET, algorithms=ALGORITHMS.HS256, issuer=JWT_ISSUER)
|