from fastapi import Query, HTTPException, status import os import shutil from typing import IO, List from sqlmodel import Session, select, or_, col from generateur.generateur_main import generate_from_data, generate_from_path from database.auth.models import User from database.db import get_session from fastapi import Depends from database.exercices.models import ExampleEnum, ExerciceCreate, Exercice, ExerciceEdit, ExerciceRead, ExercicesTagLink, Tag, TagCreate, Supports, ExerciceReadFull from services.auth import get_current_user, get_current_user_optional from services.database import generate_unique_code from services.io import add_fast_api_root, get_ancestor, get_or_create_dir def create_exo_db(exercice: ExerciceCreate, user: User,supports: Supports, exo_source: IO, db: Session): example = { "type": ExampleEnum.csv if supports['csv'] == True else ExampleEnum.web if supports['web'] == True else None, "data": generate_from_data(exo_source.read(), 3, "csv" if supports['csv'] == True else "web" if supports['web'] == True else None, True) if supports['csv'] == True == True or supports['web'] == True == True else None } exo_db = Exercice(**exercice.dict(exclude_unset=True), author_id=user.id, id_code=generate_unique_code(Exercice, db), exo_source=exo_source, **supports, examples=example) db.add(exo_db) db.commit() db.refresh(exo_db) return exo_db def clone_exo_source(path: str, id_code: str): if not os.path.exists(add_fast_api_root(path)): return None upload_root = get_ancestor(path, 2) path = add_fast_api_root(path) new_path = add_fast_api_root( os.path.join(upload_root, id_code)) dir_path = get_or_create_dir(new_path) final_path = shutil.copy(path, dir_path) return final_path def clone_exo_db(exercice: Exercice, user: User, db: Session): if exercice.author_id == user.id: return 'Vous ne pouvez pas copier un de vos exercices' new_exo = Exercice.from_orm(exercice) new_exo.id = None new_exo.author_id = user.id new_exo.origin_id = exercice.id new_id_code = generate_unique_code(Exercice, db) new_exo.id_code = new_id_code new_exo_source = clone_exo_source(exercice.exo_source, new_id_code) if not new_exo_source: return "Erreur lors de la copie de l'exercice, fichier source introuvable" new_exo.exo_source = new_exo_source db.add(new_exo) db.commit() db.refresh(new_exo) return new_exo def update_exo_db(old_exo: Exercice, new_exo: ExerciceEdit, supports: Supports | None, exo_source: IO | None, db: Session): exo_data = new_exo.dict(exclude_unset=True, exclude_none=True) for key, value in exo_data.items(): setattr(old_exo, key, value) if supports is not None: old_exo.csv = supports['csv'] old_exo.pdf = supports['pdf'] old_exo.web = supports['web'] example = { "type": ExampleEnum.csv if supports['csv'] == True else ExampleEnum.web if supports['web'] == True else None, "data": generate_from_data(exo_source.read(), 3, "csv" if supports['csv'] == True else "web" if supports['web'] == True else None, True) if supports['csv'] == True == True or supports['web'] == True == True else None } old_exo.examples = example if exo_source: os.remove(add_fast_api_root(old_exo.exo_source)) old_exo.exo_source = exo_source db.add(old_exo) db.commit() db.refresh(old_exo) return old_exo def delete_exo_db(exo: Exercice, db: Session): db.delete(exo) db.commit() return True def get_or_create_tag(tag: TagCreate, user: User, db: Session): tag_db = db.exec(select(Tag).where(Tag.author_id == user.id).where(or_( Tag.id_code == tag.id_code, Tag.label == tag.label))).first() if tag_db is not None: return tag_db, False id_code = generate_unique_code(Tag, db) tag_db = Tag(**{**tag.dict(exclude_unset=True), 'id_code': id_code, 'author_id': user.id}) db.add(tag_db) db.commit() db.refresh(tag_db) return tag_db, True def add_tags_db(exo: Exercice, tags: List[TagCreate], user: User, db: Session): new = [] for tag in tags: tag_db, created = get_or_create_tag(tag, user, db) if created: new.append(tag_db) exo.tags.append(tag_db) db.add(exo) db.commit() db.refresh(exo) return exo, new def remove_tag_db(exo: Exercice, tag: Tag, db: Session): exo.tags.remove(tag) db.add(tag) db.commit() return exo def parse_exo_tags(exo_id: int, user_id: int, db: Session): exo_tags = db.exec(select(Tag, ExercicesTagLink).where(ExercicesTagLink.exercice_id == exo_id, Tag.id == ExercicesTagLink.tag_id, Tag.author_id == user_id)).all() # select -> (Exercice, ExerciceLink) exo_tags = [t[0] for t in exo_tags] return exo_tags # Dependencies def get_exo_dependency(id_code: str, db: Session = Depends(get_session)): exo = db.exec(select(Exercice).where( Exercice.id_code == id_code)).first() return exo def check_private(exo: Exercice = Depends(get_exo_dependency), user: User = Depends(get_current_user_optional)): if user is not None and exo.author_id == user.id: return exo if exo.private is True: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Cet exercice est privé') return exo def check_exercice_author(exo: Exercice | None = Depends(get_exo_dependency), user: User = Depends(get_current_user)): if not exo: return None if exo.author_id != user.id: return False return exo def check_tag_author(tag_id: str, user: User = Depends(get_current_user), db: Session = Depends(get_session)): tag = db.exec(select(Tag).where(Tag.id_code == tag_id)).first() if not tag: return None is_owner = tag.author_id == user.id if is_owner == False: return False return tag def get_tags_dependency(tags: List[str] | None = Query(None), db: Session = Depends(get_session)): if tags is None: return [] validated_tags = db.exec( select(Tag.id_code, Tag.id).where(col(Tag.id_code).in_(tags))).all() return [t.id for t in validated_tags] #Serialize def check_author(exo: Exercice, user_id:int): return exo.author_id == user_id def serialize_exo(*, exo: Exercice, user_id: User = None, db: Session): tags = parse_exo_tags(exo_id=exo.id, user_id=user_id, db=db) if user_id is not None else [] is_author = user_id is not None and check_author(exo=exo, user_id=user_id) print('USER', exo.dict(), exo) if exo.original is not None: print('TEST', db.exec(select(User).where(User.id == exo.original.author_id)).all()) author = db.exec(select(User).where( User.id == exo.original.author_id)).all()[0] original = {**exo.original.dict(), 'author': author.username} else: original = None return ExerciceReadFull(**{**exo.dict(), "author":exo.author, "original":original, "tags":tags, "is_author":is_author, "supports":{**exo.dict()}})