88 lines
3.9 KiB
Python
88 lines
3.9 KiB
Python
|
from typing import List
|
||
|
from fastapi import APIRouter, Depends, HTTPException, status
|
||
|
from services.jwt import revoke_access
|
||
|
from services.password import get_password_hash, verify_password
|
||
|
from services.auth import get_current_clientId, get_current_user, get_current_user_optional, jwt_refresh_required
|
||
|
from database.auth.crud import change_user_uuid, check_unique_username, create_user_db, delete_user_db, update_password_db, update_user_db
|
||
|
from services.auth import authenticate_user
|
||
|
from database.auth.models import PasswordSet, User, UserEdit, UserRead, UserRegister
|
||
|
from pydantic import BaseModel
|
||
|
from fastapi_jwt_auth import AuthJWT
|
||
|
from sqlmodel import Session,select
|
||
|
from database.db import get_session
|
||
|
router = APIRouter(tags=['Authentification'])
|
||
|
|
||
|
|
||
|
class Token(BaseModel):
|
||
|
access_token: str
|
||
|
token_type: str
|
||
|
refresh_token: str
|
||
|
|
||
|
|
||
|
@router.post("/login", response_model=Token)
|
||
|
def login_for_access_token(user: User = Depends(authenticate_user)):
|
||
|
Authorize = AuthJWT()
|
||
|
access_token = Authorize.create_access_token(
|
||
|
subject=str(user.clientId), fresh=True)
|
||
|
refresh_token = Authorize.create_refresh_token(subject=str(user.clientId))
|
||
|
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
|
||
|
|
||
|
@router.post('/register', response_model=Token)
|
||
|
def register(user: UserRegister = Depends(UserRegister.as_form), Authorize: AuthJWT = Depends(), db: Session = Depends(get_session)):
|
||
|
username = check_unique_username(user.username, db)
|
||
|
if not username:
|
||
|
raise HTTPException(status_code = status.HTTP_400_BAD_REQUEST,detail={'username_error': "Nom d'utilisateur indisponible"})
|
||
|
user = create_user_db(username, get_password_hash(user.password), db)
|
||
|
access_token = Authorize.create_access_token(
|
||
|
subject=str(user.clientId))
|
||
|
refresh_token = Authorize.create_refresh_token(subject=str(user.clientId))
|
||
|
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
|
||
|
|
||
|
@router.get('/users', response_model=List[UserRead])
|
||
|
def get_users(db: Session = Depends(get_session)):
|
||
|
users = db.exec(select(User)).all()
|
||
|
return users
|
||
|
|
||
|
@router.put('/user' , response_model=UserRead,)
|
||
|
def update_user(user: UserEdit = Depends(UserEdit.as_form), clientId: str = Depends(get_current_clientId), db: Session = Depends(get_session)):
|
||
|
user_obj = update_user_db(clientId, user, db)
|
||
|
return user_obj
|
||
|
|
||
|
|
||
|
@router.put('/user/password')
|
||
|
def update_password(password: PasswordSet = Depends(PasswordSet.as_form), user: User = Depends(get_current_user), db: Session = Depends(get_session), Authorize: AuthJWT = Depends()):
|
||
|
isValid = verify_password(password.old_password, user.hashed_password)
|
||
|
if not isValid:
|
||
|
raise HTTPException(status_code=401, detail={'old_password_error': 'Mot de passe invalide'})
|
||
|
|
||
|
user_obj = update_password_db(user.id, password.password, db)
|
||
|
user_obj = change_user_uuid(user.id, db)
|
||
|
|
||
|
access_token = Authorize.create_access_token(
|
||
|
subject=str(user_obj))
|
||
|
refresh_token = Authorize.create_refresh_token(subject=str(user_obj))
|
||
|
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
|
||
|
|
||
|
|
||
|
@router.post('/logout')
|
||
|
def logout(user: User = Depends(get_current_user), db: Session = Depends(get_session),):
|
||
|
change_user_uuid(user.id, db)
|
||
|
return {'ok': True}
|
||
|
|
||
|
|
||
|
@router.delete('/user')
|
||
|
def delete_user(user: User = Depends(authenticate_user), db: Session = Depends(get_session)):
|
||
|
delete_user_db(user.id, db)
|
||
|
return {'ok': True}
|
||
|
|
||
|
|
||
|
@router.post('/check-access',)
|
||
|
def check_token(user: User = Depends(get_current_user_optional)):
|
||
|
return {'username': user.username} if user != None else False
|
||
|
|
||
|
@router.post('/refresh')
|
||
|
def refresh(Authorize: AuthJWT = Depends(jwt_refresh_required)):
|
||
|
current_user = Authorize.get_jwt_subject()
|
||
|
new_access_token = Authorize.create_access_token(subject=current_user)
|
||
|
return {"access_token": new_access_token}
|