60 lines
2.2 KiB
Python
60 lines
2.2 KiB
Python
|
from services.password import verify_password
|
||
|
from fastapi_jwt_auth import AuthJWT
|
||
|
from fastapi import Depends, HTTPException, status
|
||
|
from database.auth.crud import get_user_from_clientId_db, get_user_from_username_db
|
||
|
from sqlmodel import Session
|
||
|
from database.db import get_session
|
||
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
||
|
|
||
|
bearer = OAuth2PasswordBearer(tokenUrl='/login')
|
||
|
|
||
|
|
||
|
def authenticate_user(user: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_session)):
|
||
|
user_db = get_user_from_username_db(user.username, db)
|
||
|
if not user_db:
|
||
|
raise HTTPException(
|
||
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
|
detail={"username_error": "Utilisateur introuvable"},
|
||
|
headers={"WWW-Authenticate": "Bearer"},
|
||
|
)
|
||
|
if not verify_password(user.password, user_db.hashed_password):
|
||
|
raise HTTPException(
|
||
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
|
detail={"password_error": "Mot de passe invalide"},
|
||
|
headers={"WWW-Authenticate": "Bearer"},
|
||
|
)
|
||
|
return user_db
|
||
|
|
||
|
def jwt_required(Authorize: AuthJWT = Depends(), token: str = Depends(bearer)):
|
||
|
Authorize.jwt_required()
|
||
|
return Authorize
|
||
|
|
||
|
def jwt_optional(Authorize: AuthJWT = Depends()):
|
||
|
Authorize.jwt_optional()
|
||
|
return Authorize
|
||
|
|
||
|
def jwt_refresh_required(Authorize: AuthJWT = Depends(), token: str = Depends(bearer)):
|
||
|
Authorize.jwt_refresh_token_required()
|
||
|
return Authorize
|
||
|
|
||
|
def fresh_jwt_required(Authorize: AuthJWT = Depends(), token: str = Depends(bearer)):
|
||
|
Authorize.fresh_jwt_required()
|
||
|
return Authorize
|
||
|
|
||
|
def get_current_clientId(Authorize: AuthJWT = Depends(jwt_required)):
|
||
|
return Authorize.get_jwt_subject()
|
||
|
|
||
|
def get_current_user(clientId: str = Depends(get_current_clientId), db: Session = Depends(get_session)):
|
||
|
user = get_user_from_clientId_db(clientId, db)
|
||
|
if not user:
|
||
|
raise HTTPException(
|
||
|
status_code=status.HTTP_401_UNAUTHORIZED, detail='Utilisateur introuvable')
|
||
|
return user
|
||
|
|
||
|
def get_current_user_optional(Authorize: AuthJWT = Depends(jwt_optional), db: Session = Depends(get_session)):
|
||
|
clientId = Authorize.get_jwt_subject()
|
||
|
if clientId:
|
||
|
return get_user_from_clientId_db(clientId, db)
|
||
|
return None
|
||
|
|