generateur_v3/backend/api/database/exercices/crud.py
2023-01-27 21:41:08 +01:00

203 lines
7.1 KiB
Python

from fastapi import Query, HTTPException, status
import os
import shutil
from typing import IO, List
from sqlmodel import Session, select, or_, col
from generateur.generateur_main import generate_from_data, generate_from_path
from database.auth.models import User
from database.db import get_session
from fastapi import Depends
from database.exercices.models import ExampleEnum, ExerciceCreate, Exercice, ExerciceEdit, ExerciceRead, ExercicesTagLink, Tag, TagCreate, Supports, ExerciceReadFull
from services.auth import get_current_user, get_current_user_optional
from services.database import generate_unique_code
from services.io import add_fast_api_root, get_ancestor, get_or_create_dir
def create_exo_db(exercice: ExerciceCreate, user: User,supports: Supports, exo_source: IO, db: Session):
example = {
"type": ExampleEnum.csv if supports['csv'] == True else ExampleEnum.web if supports['web'] == True else None,
"data": generate_from_data(exo_source.read(), 3, "csv" if supports['csv'] == True else "web" if supports['web'] == True else None, True) if supports['csv'] == True == True or supports['web'] == True == True else None
}
exo_db = Exercice(**exercice.dict(exclude_unset=True),
author_id=user.id, id_code=generate_unique_code(Exercice, db), exo_source=exo_source, **supports, examples=example)
db.add(exo_db)
db.commit()
db.refresh(exo_db)
return exo_db
def clone_exo_source(path: str, id_code: str):
if not os.path.exists(add_fast_api_root(path)):
return None
upload_root = get_ancestor(path, 2)
path = add_fast_api_root(path)
new_path = add_fast_api_root(
os.path.join(upload_root, id_code))
dir_path = get_or_create_dir(new_path)
final_path = shutil.copy(path, dir_path)
return final_path
def clone_exo_db(exercice: Exercice, user: User, db: Session):
if exercice.author_id == user.id:
return 'Vous ne pouvez pas copier un de vos exercices'
new_exo = Exercice.from_orm(exercice)
new_exo.id = None
new_exo.author_id = user.id
new_exo.origin_id = exercice.id
new_id_code = generate_unique_code(Exercice, db)
new_exo.id_code = new_id_code
new_exo_source = clone_exo_source(exercice.exo_source, new_id_code)
if not new_exo_source:
return "Erreur lors de la copie de l'exercice, fichier source introuvable"
new_exo.exo_source = new_exo_source
db.add(new_exo)
db.commit()
db.refresh(new_exo)
return new_exo
def update_exo_db(old_exo: Exercice, new_exo: ExerciceEdit, supports: Supports | None, exo_source: IO | None, db: Session):
exo_data = new_exo.dict(exclude_unset=True, exclude_none=True)
for key, value in exo_data.items():
setattr(old_exo, key, value)
if supports is not None:
old_exo.csv = supports['csv']
old_exo.pdf = supports['pdf']
old_exo.web = supports['web']
example = {
"type": ExampleEnum.csv if supports['csv'] == True else ExampleEnum.web if supports['web'] == True else None,
"data": generate_from_data(exo_source.read(), 3, "csv" if supports['csv'] == True else "web" if supports['web'] == True else None, True) if supports['csv'] == True == True or supports['web'] == True == True else None
}
old_exo.examples = example
if exo_source:
os.remove(add_fast_api_root(old_exo.exo_source))
old_exo.exo_source = exo_source
db.add(old_exo)
db.commit()
db.refresh(old_exo)
return old_exo
def delete_exo_db(exo: Exercice, db: Session):
db.delete(exo)
db.commit()
return True
def get_or_create_tag(tag: TagCreate, user: User, db: Session):
tag_db = db.exec(select(Tag).where(Tag.author_id == user.id).where(or_(
Tag.id_code == tag.id_code, Tag.label == tag.label))).first()
if tag_db is not None:
return tag_db, False
id_code = generate_unique_code(Tag, db)
tag_db = Tag(**{**tag.dict(exclude_unset=True),
'id_code': id_code, 'author_id': user.id})
db.add(tag_db)
db.commit()
db.refresh(tag_db)
return tag_db, True
def add_tags_db(exo: Exercice, tags: List[TagCreate], user: User, db: Session):
new = []
for tag in tags:
tag_db, created = get_or_create_tag(tag, user, db)
if created:
new.append(tag_db)
exo.tags.append(tag_db)
db.add(exo)
db.commit()
db.refresh(exo)
return exo, new
def remove_tag_db(exo: Exercice, tag: Tag, db: Session):
exo.tags.remove(tag)
db.add(tag)
db.commit()
return exo
def parse_exo_tags(exo_id: int, user_id: int, db: Session):
exo_tags = db.exec(select(Tag, ExercicesTagLink).where(ExercicesTagLink.exercice_id ==
exo_id, Tag.id == ExercicesTagLink.tag_id, Tag.author_id == user_id)).all() # select -> (Exercice, ExerciceLink)
exo_tags = [t[0] for t in exo_tags]
return exo_tags
# Dependencies
def get_exo_dependency(id_code: str, db: Session = Depends(get_session)):
exo = db.exec(select(Exercice).where(
Exercice.id_code == id_code)).first()
return exo
def check_private(exo: Exercice = Depends(get_exo_dependency), user: User = Depends(get_current_user_optional)):
if user is not None and exo.author_id == user.id:
return exo
if exo.private is True:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Cet exercice est privé')
return exo
def check_exercice_author(exo: Exercice | None = Depends(get_exo_dependency), user: User = Depends(get_current_user)):
if not exo:
return None
if exo.author_id != user.id:
return False
return exo
def check_tag_author(tag_id: str, user: User = Depends(get_current_user), db: Session = Depends(get_session)):
tag = db.exec(select(Tag).where(Tag.id_code == tag_id)).first()
if not tag:
return None
is_owner = tag.author_id == user.id
if is_owner == False:
return False
return tag
def get_tags_dependency(tags: List[str] | None = Query(None), db: Session = Depends(get_session)):
if tags is None:
return []
validated_tags = db.exec(
select(Tag.id_code, Tag.id).where(col(Tag.id_code).in_(tags))).all()
return [t.id for t in validated_tags]
#Serialize
def check_author(exo: Exercice, user_id:int):
return exo.author_id == user_id
def serialize_exo(*, exo: Exercice, user_id: User = None, db: Session):
tags = parse_exo_tags(exo_id=exo.id, user_id=user_id,
db=db) if user_id is not None else []
is_author = user_id is not None and check_author(exo=exo, user_id=user_id)
print('USER', exo.dict(), exo)
if exo.original is not None:
print('TEST', db.exec(select(User).where(User.id == exo.original.author_id)).all())
author = db.exec(select(User).where(
User.id == exo.original.author_id)).all()[0]
original = {**exo.original.dict(), 'author': author.username}
else:
original = None
return ExerciceReadFull(**{**exo.dict(), "author":exo.author, "original":original, "tags":tags, "is_author":is_author, "supports":{**exo.dict()}})