from typing import List from fastapi import APIRouter, Depends, HTTPException, status from fastapi_jwt_auth import AuthJWT from pydantic import BaseModel from sqlmodel import Session, select from database.auth.crud import change_user_uuid, check_unique_username, create_user_db, delete_user_db, \ update_password_db, update_user_db, parse_user_rooms from database.auth.models import PasswordSet, User, UserEdit, UserRead, UserRegister, UserEditRead from database.db import get_session from services.auth import authenticate_user from services.auth import get_current_clientId, get_current_user, get_current_user_optional, jwt_refresh_required from services.password import get_password_hash, verify_password router = APIRouter(tags=['Authentification']) class Token(BaseModel): access_token: str token_type: str refresh_token: str @router.post("/login", response_model=Token) def login_for_access_token(user: User = Depends(authenticate_user)): Authorize = AuthJWT() access_token = Authorize.create_access_token( subject=str(user.clientId), fresh=True, user_claims={"username": user.username}) refresh_token = Authorize.create_refresh_token(subject=str( user.clientId), user_claims={"username": user.username}) return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"} @router.post('/register', response_model=Token) def register(user: UserRegister = Depends(UserRegister.as_form), Authorize: AuthJWT = Depends(), db: Session = Depends(get_session)): username = check_unique_username(user.username, db) if not username: raise HTTPException(status_code = status.HTTP_400_BAD_REQUEST,detail={'username_error': "Nom d'utilisateur indisponible"}) user = create_user_db(username, get_password_hash(user.password), db) access_token = Authorize.create_access_token( subject=str(user.clientId), user_claims={"username": user.username}) refresh_token = Authorize.create_refresh_token(subject=str( user.clientId), user_claims={"username": user.username}) return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"} @router.get('/users', response_model=List[UserRead]) def get_users(db: Session = Depends(get_session)): users = db.exec(select(User)).all() return users @router.get('/user', response_model=UserRead) def get_user(user: User = Depends(get_current_user), db: Session = Depends(get_session)): return {**user.dict(), "rooms": parse_user_rooms(user, db)} @router.put('/user' , response_model=UserEditRead,) def update_user(user: UserEdit = Depends(UserEdit.as_form), clientId: str = Depends(get_current_clientId), db: Session = Depends(get_session)): user_obj = update_user_db(clientId, user, db) return user_obj @router.put('/user/password') def update_password(password: PasswordSet = Depends(PasswordSet.as_form), user: User = Depends(get_current_user), db: Session = Depends(get_session), Authorize: AuthJWT = Depends()): isValid = verify_password(password.old_password, user.hashed_password) if not isValid: raise HTTPException(status_code=401, detail={'old_password_error': 'Mot de passe invalide'}) user_obj = update_password_db(user.id, password.password, db) user_obj = change_user_uuid(user.id, db) access_token = Authorize.create_access_token( subject=str(user_obj), user_claims={"username": user.username}) refresh_token = Authorize.create_refresh_token( subject=str(user_obj), user_claims={"username": user.username}) return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"} @router.post('/logout') def logout(user: User = Depends(get_current_user), db: Session = Depends(get_session),): change_user_uuid(user.id, db) return {'ok': True} @router.delete('/user') def delete_user(user: User = Depends(authenticate_user), db: Session = Depends(get_session)): delete_user_db(user.id, db) return {'ok': True} @router.post('/check-access',) def check_token(user: User = Depends(get_current_user_optional)): return {'username': user.username} if user != None else False @router.post('/refresh') def refresh(Authorize: AuthJWT = Depends(jwt_refresh_required)): current_user = Authorize.get_jwt_subject() username = Authorize.get_raw_jwt()['username'] new_access_token = Authorize.create_access_token(subject=current_user, user_claims={"username":username}) return {"access_token": new_access_token}