Kilton937342 4941b5a154 tests
2023-02-22 12:43:39 +01:00

98 lines
4.4 KiB
Python

from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi_jwt_auth import AuthJWT
from pydantic import BaseModel
from sqlmodel import Session, select
from database.auth.crud import change_user_uuid, check_unique_username, create_user_db, delete_user_db, \
update_password_db, update_user_db, parse_user_rooms
from database.auth.models import PasswordSet, User, UserEdit, UserRead, UserRegister, UserEditRead
from database.db import get_session
from services.auth import authenticate_user
from services.auth import get_current_clientId, get_current_user, get_current_user_optional, jwt_refresh_required
from services.password import get_password_hash, verify_password
router = APIRouter(tags=['Authentification'])
class Token(BaseModel):
access_token: str
token_type: str
refresh_token: str
@router.post("/login", response_model=Token)
def login_for_access_token(user: User = Depends(authenticate_user)):
Authorize = AuthJWT()
access_token = Authorize.create_access_token(
subject=str(user.clientId), fresh=True, user_claims={"username": user.username})
refresh_token = Authorize.create_refresh_token(subject=str(
user.clientId), user_claims={"username": user.username})
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
@router.post('/register', response_model=Token)
def register(user: UserRegister = Depends(UserRegister.as_form), Authorize: AuthJWT = Depends(), db: Session = Depends(get_session)):
username = check_unique_username(user.username, db)
if not username:
raise HTTPException(status_code = status.HTTP_400_BAD_REQUEST,detail={'username_error': "Nom d'utilisateur indisponible"})
user = create_user_db(username, get_password_hash(user.password), db)
access_token = Authorize.create_access_token(
subject=str(user.clientId), user_claims={"username": user.username})
refresh_token = Authorize.create_refresh_token(subject=str(
user.clientId), user_claims={"username": user.username})
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
@router.get('/users', response_model=List[UserRead])
def get_users(db: Session = Depends(get_session)):
users = db.exec(select(User)).all()
return users
@router.get('/user', response_model=UserRead)
def get_user(user: User = Depends(get_current_user), db: Session = Depends(get_session)):
return {**user.dict(), "rooms": parse_user_rooms(user, db)}
@router.put('/user' , response_model=UserEditRead,)
def update_user(user: UserEdit = Depends(UserEdit.as_form), clientId: str = Depends(get_current_clientId), db: Session = Depends(get_session)):
user_obj = update_user_db(clientId, user, db)
return user_obj
@router.put('/user/password')
def update_password(password: PasswordSet = Depends(PasswordSet.as_form), user: User = Depends(get_current_user), db: Session = Depends(get_session), Authorize: AuthJWT = Depends()):
isValid = verify_password(password.old_password, user.hashed_password)
if not isValid:
raise HTTPException(status_code=401, detail={'old_password_error': 'Mot de passe invalide'})
user_obj = update_password_db(user.id, password.password, db)
user_obj = change_user_uuid(user.id, db)
access_token = Authorize.create_access_token(
subject=str(user_obj), user_claims={"username": user.username})
refresh_token = Authorize.create_refresh_token(
subject=str(user_obj), user_claims={"username": user.username})
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
@router.post('/logout')
def logout(user: User = Depends(get_current_user), db: Session = Depends(get_session),):
change_user_uuid(user.id, db)
return {'ok': True}
@router.delete('/user')
def delete_user(user: User = Depends(authenticate_user), db: Session = Depends(get_session)):
delete_user_db(user.id, db)
return {'ok': True}
@router.post('/check-access',)
def check_token(user: User = Depends(get_current_user_optional)):
return {'username': user.username} if user != None else False
@router.post('/refresh')
def refresh(Authorize: AuthJWT = Depends(jwt_refresh_required)):
current_user = Authorize.get_jwt_subject()
username = Authorize.get_raw_jwt()['username']
new_access_token = Authorize.create_access_token(subject=current_user, user_claims={"username":username})
return {"access_token": new_access_token}