from fastapi import Query, HTTPException, status import os import shutil from typing import IO, List from sqlmodel import Session, select, or_ from generateur.generateur_main import generate_from_data, generate_from_path from database.auth.models import User from database.db import get_session from fastapi import Depends from database.exercices.models import ExampleEnum, ExerciceCreate, Exercice, ExerciceEdit, ExerciceRead, ExercicesTagLink, Tag, TagCreate, Supports, ExerciceReadFull from services.auth import get_current_user, get_current_user_optional from services.database import generate_unique_code from services.io import add_fast_api_root, get_ancestor, get_or_create_dir def create_exo_db(exercice: ExerciceCreate, user: User,supports: Supports, exo_source: IO, db: Session): example = { "type": ExampleEnum.csv if supports['csv'] == True else ExampleEnum.web if supports['web'] == True else None, "data": generate_from_data(exo_source.read(), 3, "csv" if supports['csv'] == True else "web" if supports['web'] == True else None, True) if supports['csv'] == True == True or supports['web'] == True == True else None } exo_db = Exercice(**exercice.dict(exclude_unset=True), author_id=user.id, id_code=generate_unique_code(Exercice, db), exo_source=exo_source, **supports, examples=example) db.add(exo_db) db.commit() db.refresh(exo_db) return exo_db def clone_exo_source(path: str, id_code: str): if not os.path.exists(add_fast_api_root(path)): return None upload_root = get_ancestor(path, 2) path = add_fast_api_root(path) new_path = add_fast_api_root( os.path.join(upload_root, id_code)) dir_path = get_or_create_dir(new_path) final_path = shutil.copy(path, dir_path) return final_path def clone_exo_db(exercice: Exercice, user: User, db: Session): if exercice.author_id == user.id: return 'Vous ne pouvez pas copier un de vos exercices' new_exo = Exercice.from_orm(exercice) new_exo.id = None new_exo.author_id = user.id new_exo.origin_id = exercice.id new_id_code = generate_unique_code(Exercice, db) new_exo.id_code = new_id_code new_exo_source = clone_exo_source(exercice.exo_source, new_id_code) if not new_exo_source: return "Erreur lors de la copie de l'exercice, fichier source introuvable" new_exo.exo_source = new_exo_source db.add(new_exo) db.commit() db.refresh(new_exo) return new_exo def update_exo_db(old_exo: Exercice, new_exo: ExerciceEdit,supports: Supports, exo_source: IO | None, db: Session): exo_data = new_exo.dict(exclude_unset=True, exclude_none=True) for key, value in exo_data.items(): setattr(old_exo, key, value) old_exo.csv = supports['csv'] old_exo.pdf = supports['pdf'] old_exo.web = supports['web'] example = { "type": ExampleEnum.csv if supports['csv'] == True else ExampleEnum.web if supports['web'] == True else None, "data": generate_from_data(exo_source.read(), 3, "csv" if supports['csv'] == True else "web" if supports['web'] == True else None, True) if supports['csv'] == True == True or supports['web'] == True == True else None } old_exo.examples = example if exo_source: os.remove(add_fast_api_root(old_exo.exo_source)) old_exo.exo_source = exo_source db.add(old_exo) db.commit() db.refresh(old_exo) return old_exo def delete_exo_db(exo: Exercice, db: Session): db.delete(exo) db.commit() return True def get_or_create_tag(tag: TagCreate, user: User, db: Session): tag_db = db.exec(select(Tag).where(Tag.author_id == user.id).where(or_( Tag.id_code == tag.id_code, Tag.label == tag.label))).first() if tag_db is not None: return tag_db id_code = generate_unique_code(Tag, db) tag_db = Tag(**{**tag.dict(exclude_unset=True), 'id_code': id_code, 'author_id': user.id}) db.add(tag_db) db.commit() db.refresh(tag_db) return tag_db def add_tags_db(exo: Exercice, tags: List[TagCreate], user: User, db: Session): for tag in tags: tag_db = get_or_create_tag(tag, user, db) exo.tags.append(tag_db) db.add(exo) db.commit() db.refresh(exo) return exo def remove_tag_db(exo: Exercice, tag: Tag, db: Session): exo.tags.remove(tag) db.add(tag) db.commit() return exo def parse_exo_tags(exo_id: int, user_id: int, db: Session): exo_tags = db.exec(select(Tag, ExercicesTagLink).where(ExercicesTagLink.exercice_id == exo_id, Tag.id == ExercicesTagLink.tag_id, Tag.author_id == user_id)).all() # select -> (Exercice, ExerciceLink) exo_tags = [t[0] for t in exo_tags] return exo_tags # Dependencies def get_exo_dependency(id_code: str, db: Session = Depends(get_session)): exo = db.exec(select(Exercice).where( Exercice.id_code == id_code)).first() return exo def check_private(exo: Exercice = Depends(get_exo_dependency), user: User = Depends(get_current_user_optional)): if user is not None and exo.author_id == user.id: return exo if exo.private is True: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Cet exercice est privé') return exo def check_exercice_author(exo: Exercice | None = Depends(get_exo_dependency), user: User = Depends(get_current_user)): if not exo: return None if exo.author_id != user.id: return False return exo def check_tag_author(tag_id: str, user: User = Depends(get_current_user), db: Session = Depends(get_session)): tag = db.exec(select(Tag).where(Tag.id_code == tag_id)).first() if not tag: return None is_owner = tag.author_id == user.id if is_owner == False: return False return tag def get_tags_dependency(tags: List[str] | None = Query(None), db: Session = Depends(get_session)): if tags is None: return None validated_tags = [] for t in tags: tag = db.exec(select(Tag.id).where(Tag.id_code == t)).first() if tag is not None: validated_tags.append(tag) return validated_tags #Serialize def check_author(exo: Exercice, user_id:int): return exo.author_id == user_id def serialize_exo(*, exo: ExerciceRead, user_id: User = None, db: Session): tags = parse_exo_tags(exo_id=exo.id, user_id=user_id, db=db) if user_id is not None else [] is_author = user_id is not None and check_author(exo=exo, user_id=user_id) return ExerciceReadFull(**exo.dict(), author=exo.author, original=exo.original, tags=tags, is_author=is_author, supports={**exo.dict()})