248 lines
9.5 KiB
Python
248 lines
9.5 KiB
Python
|
from pydantic import BaseModel
|
||
|
import csv
|
||
|
import io
|
||
|
from typing import List
|
||
|
from fastapi.exceptions import HTTPException
|
||
|
from fastapi.responses import FileResponse, StreamingResponse
|
||
|
from fastapi import APIRouter, Depends, UploadFile, status, Query
|
||
|
from services.database import get_exo
|
||
|
from services.io import add_fast_api_root, get_filename_from_path
|
||
|
from database.exercices.crud import add_tag_db, clone_exo_db, create_exo_db, delete_exo_db, get_exo_source_path, update_exo_db
|
||
|
from services.exoValidation import validate_file
|
||
|
from database.auth.models import User
|
||
|
from services.auth import check_author_exo, get_current_user, get_current_user_optional
|
||
|
from database.exercices.models import Exercice, Tag
|
||
|
from fastapi_jwt_auth import AuthJWT
|
||
|
|
||
|
from schemas.exercices import ExerciceIn_form, ExerciceSchema, Exercices_schema, Exercice_schema, Exercices_withoutTags, Tag_schema, TagFull_schema, TagIn
|
||
|
from fastapi_pagination import paginate, Page
|
||
|
|
||
|
from generateur.generateur_csv import Csv_generator
|
||
|
|
||
|
router = APIRouter(tags=['exercices'])
|
||
|
|
||
|
|
||
|
|
||
|
def get_exercice_data(exo: Exercice, user: User | None = None):
|
||
|
return {}
|
||
|
|
||
|
# Exercices
|
||
|
|
||
|
|
||
|
class Exo(BaseModel):
|
||
|
name: str
|
||
|
id_code: str
|
||
|
tags: List[TagFull_schema]
|
||
|
|
||
|
async def get_tags(tags: List[str] | None=Query(None)):
|
||
|
if tags is None:
|
||
|
return None
|
||
|
validated_tags = []
|
||
|
for t in tags:
|
||
|
tag = await Tag.get_or_none(id_code=t)
|
||
|
if tag is not None:
|
||
|
validated_tags.append(tag)
|
||
|
return validated_tags
|
||
|
|
||
|
|
||
|
async def get_exo_by_tags(exos: Query, tags: List[Tag]):
|
||
|
valid_exos = []
|
||
|
for e in exos:
|
||
|
exo_tags = await e.tags.all()
|
||
|
if (all(t in exo_tags for t in tags)):
|
||
|
valid_exos.append(e)
|
||
|
return valid_exos
|
||
|
|
||
|
@router.get("/exercices", response_model=Page[Exo])
|
||
|
async def get_exercices(search: str = "", tags: List[Tag] | None = Depends(get_tags)):
|
||
|
exos = Exercice.all()
|
||
|
print(Exo.schema_json(indent=4))
|
||
|
print(Exercice_schema.schema_json(indent=4))
|
||
|
if tags != None:
|
||
|
exos = await exos.filter(tags__id_code__in=[t.id_code for t in tags]).distinct()
|
||
|
exos = await get_exo_by_tags(exos, tags)
|
||
|
print(exos, [e.id_code for e in exos])
|
||
|
exos = Exercice.filter(id_code__in = [e.id_code for e in exos])
|
||
|
print( await exos)
|
||
|
exos = exos.filter(name__icontains= search)
|
||
|
exo_list = await Exercices_schema.from_queryset(exos)
|
||
|
return paginate(exo_list)
|
||
|
|
||
|
async def get_exo_with_user_tags(exo: Exercice, user: User) -> Exercices_schema:
|
||
|
exo_data = await Exercice_schema.from_tortoise_orm(exo)
|
||
|
|
||
|
exo_tags = await exo.tags.all()
|
||
|
exo_data = {**exo_data.dict(), 'tags': await TagFull_schema.from_queryset(exo.tags.filter(owner_id=user.id))}
|
||
|
|
||
|
return exo_data
|
||
|
|
||
|
|
||
|
|
||
|
@router.get('/exercices/user', response_model=Page[Exercices_schema])
|
||
|
async def get_user_exercices(user: User = Depends(get_current_user), search: str = "", tags: List[Tag] | None = Depends(get_tags)):
|
||
|
|
||
|
exos = Exercice.filter(author_id=user.id)
|
||
|
print('tatgs', tags)
|
||
|
if tags != None:
|
||
|
print('lolilol')
|
||
|
exos = await exos.filter(tags__id_code__in=[t.id_code for t in tags]).distinct()
|
||
|
exos = await get_exo_by_tags(exos, tags)
|
||
|
exos = Exercice.filter(id_code__in=[e.id_code for e in exos])
|
||
|
exos = await exos.filter(name__icontains=search)
|
||
|
|
||
|
exo_list = [await get_exo_with_user_tags(e, user) for e in exos]
|
||
|
print(len(exo_list))
|
||
|
return paginate(exo_list)
|
||
|
|
||
|
|
||
|
def exclude_dict_key(dict, key):
|
||
|
return {k: v for k, v in dict.items() if k != key}
|
||
|
|
||
|
@router.get('/exercices/public', response_model=Page[Exercices_schema | Exercices_withoutTags])
|
||
|
async def get_public_exercices(user: User | None = Depends(get_current_user_optional), search: str = "", tags: List[Tag] | None = Depends(get_tags)):
|
||
|
is_authenticated = user != None
|
||
|
if is_authenticated:
|
||
|
exos = Exercice.filter(author_id__not=user.id, private = False, isOriginal=True)
|
||
|
if tags != None:
|
||
|
exos = await exos.filter(tags__id_code__in=[t.id_code for t in tags]).distinct()
|
||
|
exos = await get_exo_by_tags(exos, tags)
|
||
|
exos = Exercice.filter(id_code__in=[e.id_code for e in exos])
|
||
|
else:
|
||
|
exos = Exercice.filter(private=False, isOriginal=True)
|
||
|
|
||
|
exos = await exos.filter(name__icontains=search)
|
||
|
|
||
|
if is_authenticated:
|
||
|
exo_list = [await get_exo_with_user_tags(e, user) for e in exos]
|
||
|
else:
|
||
|
exo_list = await Exercices_withoutTags.from_queryset(Exercice.all())
|
||
|
|
||
|
return paginate(exo_list)
|
||
|
|
||
|
async def get_exo_tags(exo: Exercice, user: User) -> Exercice_schema:
|
||
|
exo_data = await Exercice_schema.from_tortoise_orm(exo)
|
||
|
|
||
|
exo_tags = await exo.tags.all()
|
||
|
exo_data = {**exo_data.dict(), 'tags': await TagFull_schema.from_queryset(exo.tags.filter(owner_id=user.id))}
|
||
|
|
||
|
return exo_data
|
||
|
|
||
|
@router.get('/exercice/{id_code}', response_model=ExerciceSchema)
|
||
|
async def get_exercice(id_code: str, user: User = Depends(get_current_user_optional)):
|
||
|
is_authenticated = user != None
|
||
|
print(TagFull_schema.schema_json(indent=4), '\n\n', ExerciceSchema.schema_json(indent=4))
|
||
|
exo = await Exercice.get_or_none(id_code=id_code)
|
||
|
if exo is None:
|
||
|
raise HTTPException(
|
||
|
status_code=status.HTTP_404_NOT_FOUND, detail="exercice not found")
|
||
|
|
||
|
if is_authenticated:
|
||
|
|
||
|
author = await exo.author
|
||
|
is_author = author.id == user.id
|
||
|
|
||
|
exo_obj = await Exercice_schema.from_tortoise_orm(exo)
|
||
|
exo_obj = await get_exo_tags(exo, user)
|
||
|
print(exo_obj)
|
||
|
exo_dict = exo_obj
|
||
|
exo_dict['is_author'] = is_author
|
||
|
exo_dict['exo_source_name'] = get_filename_from_path(exo.exo_source)
|
||
|
|
||
|
return exo_dict
|
||
|
|
||
|
exo_obj = await Exercice_schema.from_tortoise_orm(exo)
|
||
|
exo_dict = exo_obj.dict()
|
||
|
exo_dict['is_author'] = False
|
||
|
exo_dict['exo_source_name'] = get_filename_from_path(exo.exo_source)
|
||
|
exo_dict['tags'] = []
|
||
|
return exo_dict
|
||
|
|
||
|
|
||
|
@router.post("/exercices", response_model=Exercice_schema)
|
||
|
async def create_exercice(file: UploadFile = Depends(validate_file), exo: ExerciceIn_form = Depends(ExerciceIn_form.as_form), user: User = Depends(get_current_user)):
|
||
|
file_obj = file.file._file
|
||
|
file_obj.name = file.filename
|
||
|
exo_obj = await create_exo_db(**{**exo.dict(exclude_unset=True)}, exo_source=file_obj, author_id=user.id)
|
||
|
return await Exercice_schema.from_tortoise_orm(exo_obj)
|
||
|
|
||
|
|
||
|
@router.delete("/exercice/{id_code}")
|
||
|
async def delete_exercice(id_code: str, author: User = Depends(check_author_exo)):
|
||
|
await delete_exo_db(id_code)
|
||
|
return {'detail': "Exercice successfully deleted"}
|
||
|
|
||
|
|
||
|
|
||
|
@router.put("/exercice/{id_code}", response_model=Exercice_schema)
|
||
|
async def update_exercice(id_code: str, file: UploadFile = None, exo: ExerciceIn_form = Depends(ExerciceIn_form.as_form), author: User = Depends(check_author_exo)):
|
||
|
file_obj = None
|
||
|
if file != None:
|
||
|
file_obj = file.file._file
|
||
|
file_obj.name = file.filename
|
||
|
exo_obj = await update_exo_db(id_code, exo_source=file_obj, ** {**exo.dict(exclude_unset=True)})
|
||
|
return await Exercice_schema.from_tortoise_orm(exo_obj)
|
||
|
|
||
|
|
||
|
@router.post('/exercices/{id_code}/clone', response_model=Exercice_schema)
|
||
|
async def clone_exercice(id_code: str, user: User = Depends(get_current_user)):
|
||
|
exo_obj = await clone_exo_db(id_code, user.id)
|
||
|
return await Exercice_schema.from_tortoise_orm(exo_obj)
|
||
|
|
||
|
|
||
|
@router.get('/exercices/{id_code}/exo_source')
|
||
|
async def get_exo_source(id_code: str, author: User = Depends(check_author_exo)):
|
||
|
path = await get_exo_source_path(id_code)
|
||
|
filename = get_filename_from_path(path)
|
||
|
return FileResponse(path, headers={'content-disposition': 'attachment;filename='+filename})
|
||
|
|
||
|
|
||
|
# Tags
|
||
|
@router.get('/tags')
|
||
|
async def get_tags(user: User = Depends(get_current_user)):
|
||
|
return await Tag_schema.from_queryset(user.tags.all())
|
||
|
|
||
|
|
||
|
|
||
|
@router.post('/tags/{id_code}')
|
||
|
async def add_tags(id_code: str, tags: List[TagIn], user: User = Depends(get_current_user)):
|
||
|
exo = await Exercice.get(id_code = id_code)
|
||
|
exercice = await add_tag_db(exo, tags, user.id)
|
||
|
|
||
|
return {'exo': await get_exo_with_user_tags(exo, user), 'tags': await TagFull_schema.from_queryset(user.tags.all())}
|
||
|
|
||
|
|
||
|
async def check_tag_owner(tag_id: str, user: User):
|
||
|
tag = await Tag.get(id_code=tag_id)
|
||
|
owner = await tag.owner
|
||
|
if owner.id != user.id:
|
||
|
raise HTTPException(status_code = status.HTTP_401_UNAUTHORIZED, detail="Vous n'êtes pas le créateur du tag")
|
||
|
return tag
|
||
|
|
||
|
@router.delete('/tags/{id_code}/{tag_id}')
|
||
|
async def delete_tag(id_code:str,tag_id: str, user: User = Depends(get_current_user)):
|
||
|
tag = await check_tag_owner(tag_id, user)
|
||
|
exo = await Exercice.get(id_code=id_code)
|
||
|
await exo.tags.remove(tag)
|
||
|
return await get_exo_with_user_tags(exo, user)
|
||
|
|
||
|
|
||
|
# Generation
|
||
|
|
||
|
@router.get('/generator/csv/{exo_id}')
|
||
|
async def generate_csv(exo_id: str, filename: str):
|
||
|
exo = await Exercice.get(id_code=exo_id)
|
||
|
|
||
|
if exo.csvSupport == False:
|
||
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
|
||
|
detail='Impossible de générer cet exercice sur le support csv')
|
||
|
|
||
|
source_path = add_fast_api_root(exo.exo_source)
|
||
|
consigne = exo.consigne
|
||
|
|
||
|
buffer = io.StringIO()
|
||
|
writer = csv.writer(buffer, delimiter=',',
|
||
|
quotechar=',', quoting=csv.QUOTE_MINIMAL, dialect='excel') # mettre | comme sep un jour
|
||
|
Csv_generator(source_path, 10, 10, 12, consigne, writer)
|
||
|
|
||
|
return StreamingResponse(iter([buffer.getvalue()]), headers={"Content-Disposition": f'attachment;filename="{filename}'}, media_type='text/csv')
|