generateur_v3/backend/api/routes/exercices/routes.py

213 lines
9.9 KiB
Python

from enum import Enum
from typing import List
from fastapi import APIRouter, Depends, Path, Query, UploadFile, HTTPException, status
from database.auth.models import User
from database.db import get_session
from database.exercices.models import Exercice, ExerciceCreate, ExerciceEdit, ExerciceReadFull, ExercicesTagLink, Tag, TagCreate, TagRead, ExerciceRead
from services.auth import get_current_user, get_current_user_optional
from sqlmodel import Session, select, col
from database.exercices.crud import add_tags_db, check_exercice_author, check_private, check_tag_author, create_exo_db, delete_exo_db, get_exo_dependency, clone_exo_db, parse_exo_tags, remove_tag_db, serialize_exo, update_exo_db, get_tags_dependency
from services.exoValidation import validate_file, validate_file_optionnal
from services.io import add_fast_api_root, get_filename_from_path
from fastapi.responses import FileResponse
from sqlmodel import func
from fastapi_pagination import Page, paginate
from fastapi_pagination.ext.sqlalchemy_future import paginate as p
router = APIRouter(tags=['exercices'])
class ExoType(str, Enum):
csv="csv"
pdf="pdf"
web="web"
def filter_exo_by_tags(exos: List[tuple[Exercice, str]], tags: List[Tag]):
valid_exos = [exo for exo, tag in exos if all(
str(t) in tag.split(',') for t in tags)]
return valid_exos
def queryFilters_dependency(search: str = "", tags: List[int] | None = Depends(get_tags_dependency), type: ExoType | None = Query(default = None)):
return search, tags, type
@router.post('/exercices', response_model=ExerciceReadFull, status_code=status.HTTP_201_CREATED)
def create_exo(exercice: ExerciceCreate = Depends(ExerciceCreate.as_form), file: UploadFile = Depends(validate_file), user: User = Depends(get_current_user), db: Session = Depends(get_session)):
file_obj = file['file'].file._file
file_obj.name = file['file'].filename
exo_obj = create_exo_db(exercice=exercice, user=user,
exo_source=file_obj, supports=file['supports'],db=db)
return serialize_exo(exo=exo_obj, user_id=user.id, db=db)
@router.post('/clone/{id_code}', response_model=ExerciceReadFull)
def clone_exo(exercice: Exercice | None = Depends(check_private), user: User = Depends(get_current_user), db: Session = Depends(get_session)):
if not exercice:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={
"Exercice introuvable"})
exo_obj = clone_exo_db(exercice, user, db)
if type(exo_obj) == str:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=exo_obj)
return serialize_exo(exo=exo_obj, user_id=user.id, db=db)
@router.get('/exercices/user', response_model=Page[ExerciceRead|ExerciceReadFull])
def get_user_exercices(user: User = Depends(get_current_user), queryFilters: tuple[str, List[int] | None] = Depends(queryFilters_dependency), db: Session = Depends(get_session)):
search, tags, type = queryFilters
if tags is not None and len(tags) != 0:
statement = select(Exercice, func.group_concat(
ExercicesTagLink.tag_id))
else:
statement = select(Exercice)
statement = statement.where(Exercice.author_id == user.id)
statement = statement.where(Exercice.name.startswith(search))
if type == ExoType.csv:
statement = statement.where(Exercice.csv == True)
if type == ExoType.pdf:
statement = statement.where(Exercice.pdf == True)
if type == ExoType.web:
statement = statement.where(Exercice.web == True)
if tags is not None and len(tags) != 0:
statement = statement.join(ExercicesTagLink).where(
col(ExercicesTagLink.tag_id).in_(tags)).group_by(ExercicesTagLink.exercice_id)
#exercices = db.exec(statement).all()
page = p(db, statement)
exercices = page.items
if tags is not None and len(tags) != 0:
exercices = filter_exo_by_tags(exercices, tags)
page.items = [
serialize_exo(exo=e, user_id=user.id, db=db) for e in exercices]
return page
@router.get('/exercices/public', response_model=Page[ExerciceRead|ExerciceReadFull])
def get_public_exercices(user: User | None = Depends(get_current_user_optional), queryFilters: tuple[str, List[int] | None] = Depends(queryFilters_dependency), db: Session = Depends(get_session)):
search, tags, type = queryFilters
if user is not None:
if tags is not None and len(tags) != 0:
statement = select(Exercice, func.group_concat(
ExercicesTagLink.tag_id))
else:
statement = select(Exercice)
statement = statement.where(Exercice.author_id != user.id)
statement = statement.where(Exercice.private == False)
statement = statement.where(Exercice.name.startswith(search))
if type == ExoType.csv:
statement = statement.where(Exercice.csv == True)
if type == ExoType.pdf:
statement = statement.where(Exercice.pdf == True)
if type == ExoType.web:
statement = statement.where(Exercice.web == True)
if tags is not None and len(tags) != 0:
statement = statement.join(ExercicesTagLink).where(
col(ExercicesTagLink.tag_id).in_(tags)).group_by(ExercicesTagLink.exercice_id)
exercices = db.exec(statement).all()
if tags is not None and len(tags) != 0:
exercices = filter_exo_by_tags(exercices, tags)
exercices = [
serialize_exo(exo=e, user_id=user.id, db=db) for e in exercices]
return exercices
else:
statement = select(Exercice)
statement = statement.where(Exercice.private == False)
statement = statement.where(Exercice.name.startswith(search))
if type == ExoType.csv:
statement = statement.where(Exercice.csv == True)
if type == ExoType.pdf:
statement = statement.where(Exercice.pdf == True)
if type == ExoType.web:
statement = statement.where(Exercice.web == True)
exercices = db.exec(statement).all()
return paginate([serialize_exo(exo=e, user_id=None, db=db) for e in exercices])
@router.get('/exercice/{id_code}', response_model=ExerciceReadFull)
def get_exercice(exo: Exercice = Depends(check_private), user: User | None = Depends(get_current_user_optional), db: Session = Depends(get_session)):
return serialize_exo(exo=exo, user_id=getattr(user, 'id', None), db=db)
@router.put('/exercice/{id_code}', response_model=ExerciceReadFull)
def update_exo(file: UploadFile = Depends(validate_file_optionnal), exo: Exercice = Depends(check_exercice_author), exercice: ExerciceEdit = Depends(ExerciceEdit.as_form), db: Session = Depends(get_session)):
if exo is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail='Exercice introuvable')
if exo == False:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail='Cet exercice ne vous appartient pas')
file_obj = None
if file:
file_obj = file["file"].file._file
file_obj.name = file['file'].filename
exo_obj = update_exo_db(exo, exercice,file['supports'] if file is not None else None, file_obj, db)
return serialize_exo(exo=exo_obj, user_id=exo_obj.author_id, db=db)
@router.delete('/exercice/{id_code}')
def delete_exercice(exercice: Exercice | bool | None = Depends(check_exercice_author), db: Session = Depends(get_session)):
if exercice is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail="Exercice introuvable")
if exercice == False:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail='Cet exercice ne vous appartient pas')
delete_exo_db(exercice, db)
return {'detail': 'Exercice supprimé avec succès'}
@router.post('/exercice/{id_code}/tags', response_model=ExerciceReadFull, tags=['tags'])
def add_tags(tags: List[TagCreate], exo: Exercice | None = Depends(get_exo_dependency), db: Session = Depends(get_session), user: User = Depends(get_current_user)):
if exo is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail='Exercice introuvable')
exo_obj = add_tags_db(exo, tags, user, db)
return serialize_exo(exo=exo_obj, user_id=user.id, db=db)
@router.delete('/exercice/{id_code}/tags/{tag_id}', response_model=ExerciceReadFull, tags=['tags'])
def remove_tag(exo: Exercice | None = Depends(get_exo_dependency), tag: Tag | None | bool = Depends(check_tag_author), db: Session = Depends(get_session)):
if exo is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail='Exercice introuvable')
if tag is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail='Tag introuvable')
if tag == False:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail="Vous n'êtes pas le propriétaire du tag")
exo_obj = remove_tag_db(exo, tag, db)
return serialize_exo(exo=exo_obj, user_id=tag.author_id, db=db)
@router.get('/exercice/{id_code}/exo_source')
async def get_exo_source(exo: Exercice = Depends(check_exercice_author), db: Session = Depends(get_session)):
if exo is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail='Exercice introuvable')
if exo == False:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail='Cet exercice ne vous appartient pas')
path = add_fast_api_root(exo.exo_source)
filename = get_filename_from_path(path)
return FileResponse(path, headers={'content-disposition': 'attachment;filename='+filename})
@router.get('/tags', response_model=List[TagRead], tags=['tags'])
def get_tags(user: User = Depends(get_current_user), db: Session = Depends(get_session)):
return user.tags