import uuid from copy import deepcopy from typing import List from fastapi import Depends, HTTPException, status, Query from pydantic import BaseModel from sqlalchemy import func from sqlmodel import Session, delete, select, col, update from database.auth.crud import get_user_from_token from database.auth.models import User from database.db import get_session from database.exercices.models import Exercice from database.room.models import Anonymous, Challenge, Challenges, CorrigedGeneratorOut, Exercices, ExercicesCreate, \ Member, Note, Parcours, ParcoursCreate, ParcoursReadShort, ParsedGeneratorOut, Room, RoomCreate, RoomInfo, \ TmpCorrection, Waiter, MemberRead, CorrigedData, CorrectionData, Challenger from services.auth import get_current_user_optional from services.database import generate_unique_code from uuid import UUID, uuid4 def create_room_db(*, room: RoomCreate, user: User | None = None, username: str | None = None, db: Session): id_code = generate_unique_code(Room, s=db) member_id = generate_unique_code(Member, s=db) room_obj = Room(**room.dict(exclude_unset=True), id_code=id_code) if user is not None: member = Member(user_id=user.id, room=room_obj, is_admin=True, id_code=member_id) db.add(member) db.commit() db.refresh(member) if username is not None: reconnect_code = generate_unique_code( Anonymous, s=db, field_name='reconnect_code') anonymous = Anonymous(username=username, reconnect_code=reconnect_code) member = Member(anonymous=anonymous, room=room_obj, is_admin=True, id_code=member_id) db.add(member) db.commit() db.refresh(member) if username is None and user is None: raise ValueError('Username or user required') return {"room": room_obj, "member": member} def change_room_name(room: Room, name: str, db: Session): room.name = name db.add(room) db.commit() db.refresh(room) return room def change_room_status(room: Room, public: bool, db: Session): room.public = public db.add(room) db.commit() db.refresh(room) return room def delete_room_db(room: Room, db: Session): db.delete(room) db.commit() return True def get_member_from_user(user_id: int, room_id: int, db: Session): member = db.exec(select(Member).where(Member.room_id == room_id, Member.user_id == user_id)).first() return member def get_member_from_token(token: str, room_id: int, db: Session): user = get_user_from_token(token, db) if user is False: return False if user is None: return None member = get_member_from_user(user.id, room_id, db) return member def get_member_from_anonymous(anonymous_id: int, room_id: int, db: Session): member = db.exec(select(Member).where(Member.room_id == room_id, Member.anonymous_id == anonymous_id)).first() return member def get_member_from_reconnect_code(reconnect_code: str, room_id: int, db: Session): anonymous = get_anonymous_from_code(reconnect_code, db) if anonymous is None: return None member = get_member_from_anonymous(anonymous.id, room_id, db) return member def get_anonymous_from_code(reconnect_code: str, db: Session): anonymous = db.exec(select(Anonymous).where( Anonymous.reconnect_code == reconnect_code)).first() return anonymous def get_anonymous_from_clientId(clientId: str, db: Session): anonymous = db.exec(select(Anonymous).where( Anonymous.clientId == clientId)).first() return anonymous def get_member_from_clientId(clientId: str, room_id: int, db: Session): anonymous = get_anonymous_from_clientId(clientId, db) if anonymous is None: return None member = get_member_from_anonymous(anonymous.id, room_id, db) return member def create_member(*, room: Room, user: User | None = None, anonymous: Anonymous | None = None, waiting: bool = False, db: Session): member_id = generate_unique_code(Member, s=db) member = Member(room=room, user=user, anonymous=anonymous, waiting=waiting, id_code=member_id) member.online = True db.add(member) db.commit() db.refresh(member) return member def get_or_create_member(*, room: Room, user: User | None = None, anonymous: Anonymous | None = None, waiting: bool = False, db: Session): member = user is not None and get_member_from_user(user.id, room.id, db) if member is not None and member is not False: return member member = create_member(room=room, user=user, anonymous=anonymous, waiting=waiting, db=db) def connect_member(member: Member, db: Session): member.online = True db.add(member) db.commit() db.refresh(member) return member def disconnect_member(member: Member, db: Session): if member.waiting == False: member.online = False if member.anonymous is not None: change_anonymous_clientId(member.anonymous, db) db.add(member) db.commit() db.refresh(member) return member else: db.delete(member) db.commit() return member def validate_username(username: str, room: Room, db: Session = Depends(get_session)): if len(username) > 20: return None members = select(Member.anonymous_id).where( Member.room_id == room.id, Member.anonymous_id != None) anonymous = select(Anonymous).where( col(Anonymous.id).in_(members), Anonymous.username == username) username_anonymous = db.exec(anonymous).first() return None if username_anonymous is not None else username def create_anonymous_member(username: str, room: Room, db: Session): username = validate_username(username, room, db) if username is None: return None reconnect_code = generate_unique_code( Anonymous, s=db, field_name="reconnect_code") anonymous = Anonymous(username=username, reconnect_code=reconnect_code, clientId=uuid4()) member_id = generate_unique_code(Member, s=db) member = Member(room=room, anonymous=anonymous, id_code=member_id) db.add(member) db.commit() db.refresh(member) return member def create_anonymous(username: str, room: Room, db: Session): username = validate_username(username, room, db) if username is None: return None reconnect_code = generate_unique_code( Anonymous, s=db, field_name="reconnect_code") anonymous = Anonymous(username=username, reconnect_code=reconnect_code) db.add(anonymous) db.commit() db.refresh(anonymous) return anonymous def check_user_in_room(user_id: int, room_id: int, db: Session): user = db.exec(select(Member).where(Member.user_id == user_id, Member.room_id == room_id)).first() return user def create_user_member(user: User, room: Room, db: Session): member = get_member_from_user(user.id, room.id, db) if member is not None: return None member_id = generate_unique_code(Member, s=db) member = Member(room=room, user=user, id_code=member_id) db.add(member) db.commit() db.refresh(member) return member def create_anonymous_waiter(username: str, room: Room, db: Session): username = validate_username(username, room, db) if username is None: return None reconnect_code = generate_unique_code( Anonymous, s=db, field_name="reconnect_code") anonymous = Anonymous(username=username, reconnect_code=reconnect_code) member_id = generate_unique_code(Member, s=db) member = Member(room=room, anonymous=anonymous, waiting=True, id_code=member_id) db.add(member) db.commit() db.refresh(member) return member def create_user_waiter(user: User, room: Room, db: Session): member = get_member_from_user(user.id, room.id, db) if member is not None: return member member = create_member(room=room, user=user, waiting=True, db=db) return member def get_waiter(waiter_code: str, db: Session): return db.exec(select(Member).where(Member.id_code == waiter_code, Member.waiting == True)).first() def get_member(id_code: str, room_id: str, db: Session): return db.exec(select(Member).where(Member.id_code == id_code, Member.room_id == room_id)).first() def delete_member(member: Member, db: Session): db.delete(member) db.commit() return None def accept_waiter(member: Member, db: Session): member.waiting = False member.waiter_code = None member.online = True db.add(member) db.commit() db.refresh(member) return member def refuse_waiter(member: Member, db: Session): db.delete(member) db.commit() return None def leave_room(member: Member, db: Session): # db.execute(delete(Challenger).where(col(Challenger.member_id) == member.id)) # db.execute(delete(TmpCorrection).where(col(TmpCorrection.member_id) == member.id)) db.delete(member) db.commit() return None def serialize_member(member: Member, private: bool = False, admin: bool = False, m2: Member | None = None) -> MemberRead | Waiter: member_obj = member.user or member.anonymous if not member.waiting: return MemberRead(username=member_obj.username, online=member.online, clientId=str(member_obj.clientId) if (private == True and member.user_id == None) else "", reconnect_code=getattr(member_obj, "reconnect_code", "") if (admin or m2 == member) else "", isUser=member.user_id != None, isAdmin=member.is_admin, id_code=member.id_code).dict() if member.waiting: return Waiter(username=member_obj.username, waiter_id=member.id_code).dict() def serialize_parcours_short(parcours: Parcours, member: Member, db: Session): challenger = getChallenger(parcours, member, db) return ParcoursReadShort(name=parcours.name, id_code=parcours.id_code, best_note=challenger.best, validated=challenger.validated, avg=challenger.avg) def serialize_challenge(challenge: Challenge): return Challenges( name=challenge.challenger.user.username if challenge.challenger.user is not None else challenge.challenger.anonymous.username, value=Note(note=challenge.note, time=challenge.time), isCorriged=challenge.isCorriged, canCorrige=challenge.data is not None) def serialize_room(room: Room, member: Member, db: Session): return RoomInfo(**room.dict(), parcours=[serialize_parcours_short(p, member, db) for p in room.parcours], members=[serialize_member(m, admin=member.is_admin, m2=member) for m in room.members]) def getUsername(m: Member): return m.user.username if m.user is not None else m.anonymous.username def getChallengerInfo(c: Challenge, db: Session): challenger = db.exec(select(Challenger).where(Challenger.member_id == c.challenger_mid, Challenger.parcours_id == c.challenger_pid)).first() if challenger is not None: member = challenger.member return {"name": getUsername(member), "id_code": member.id_code} def getChallenges(c: Challenger, db: Session): challenges = db.exec(select(Challenge).where(Challenge.challenger_mid == c.member_id, Challenge.challenger_pid == c.parcours_id)).all() return challenges def getMemberChallenges(m: Member, p: Parcours, db: Session): challenges = db.exec(select(Challenge).where(Challenge.challenger_mid == m.id, Challenge.challenger_pid == p.id)).all() return challenges def getTops(p: Parcours, db: Session): tops = db.exec(select(Challenge).where(Challenge.parcours_id == p.id_code).order_by( col(Challenge.mistakes), col(Challenge.time)).limit(3)).all() tops = [{"challenger": getChallengerInfo( t, db), "mistakes": t.mistakes, "time": t.time} for t in tops] return tops def getAvgTops(p: Parcours, db: Session): avgTop = db.exec(select(Challenger).where(Challenger.parcours_id == p.id).order_by(col(Challenger.avg)).limit(3)).all() avgTop = [{"id_code": t.member.id_code, "avg": t.avg, "name": getUsername(t.member)} for t in avgTop] return avgTop def getRank(c: Challenger, p: Parcours, db: Session): noteRank = db.exec(select([func.count(Challenge.id)]).where(Challenge.parcours_id == p.id_code).order_by( col(Challenge.mistakes), col(Challenge.time)).where(Challenge.mistakes < c.best, Challenge.time < c.best_time)).one() return noteRank + 1 def getAvgRank(c: Challenger, p: Parcours, db: Session): avgRank = db.exec(select([func.count(Challenger.member_id)]).where( Challenger.parcours_id == p.id).order_by(col(Challenger.avg)).where(Challenger.avg < c.avg)).one() return avgRank + 1 def getMemberRank(m: Member, p: Parcours, db: Session): challenger = db.exec(select(Challenger).where(Challenger.member_id == m.id)).first() if challenger is None or challenger.best is None: return None return getRank(challenger, p, db) def getMemberAvgRank(m: Member, p: Parcours, db: Session): challenger = db.exec(select(Challenger).where(Challenger.member_id == m.id)).first() if challenger is None or challenger.avg is None: return None return getAvgRank(challenger, p, db) def getMemberValidated(m: Member, p: Parcours, db: Session): challenger = db.exec(select(Challenger).where(Challenger.member_id == m.id)).first() if challenger is None or challenger.validated is None: return None return challenger.validated def serialize_parcours(parcours: Parcours, member: Member, db: Session): tops = getTops(parcours, db) avgTop = getAvgTops(parcours, db) challenger = db.exec(select(Challenger).where( Challenger.member_id == member.id, Challenger.parcours_id == parcours.id)).first() noteRank = None avgRank = None pb = None if challenger is not None and challenger.avg is not None and challenger.best is not None: noteRank = getRank(challenger, parcours, db) avgRank = getAvgRank(challenger, parcours, db) pb = {"mistakes": challenger.best, "time": challenger.best_time} statement = select(Challenger).where(Challenger.parcours_id == parcours.id) if not member.is_admin: statement = statement.where(Challenger.member_id == member.id) challengers = db.exec(statement).all() challs = {c.member.id_code: { "challenger": {"id_code": c.member.id_code, "name": getUsername(c.member), "validated": c.validated}, # 'validated': chall.mistakes <= parcours.max_mistakes "challenges": [Challenges(**{**chall.dict(), "canCorrige": chall.data != []}) for chall in getChallenges(c, db)] } for c in challengers} return {**parcours.dict(), "pb": pb, "tops": tops, "challenges": challs, "rank": noteRank, "memberRank": avgRank, "validated": challenger.validated if challenger != None else False, "ranking": avgTop} def change_anonymous_clientId(anonymous: Anonymous, db: Session): anonymous.clientId = uuid.uuid4() db.add(anonymous) db.commit() db.refresh(anonymous) return anonymous # Parcours from services.io import add_fast_api_root from generateur.generateur_main import generate_from_path, parseGeneratorOut def countInput(ex: Exercice, q: int): exo = parseGeneratorOut(generate_from_path(add_fast_api_root( ex.exo_source), 1, "web")) return len(exo.inputs) * q class ExoToCount(BaseModel): ex: Exercice q: int def getTotal(exs: list[ExoToCount]): total = 0 for e in exs: total += countInput(e.ex, e.q) return total def validate_exercices(exos: List[ExercicesCreate], db: Session): exercices = db.exec(select(Exercice).where(Exercice.web == True).where( col(Exercice.id_code).in_([e.exercice_id for e in exos]))).all() exos_id_list = [e.exercice_id for e in exos] # exoToCountList = [ExoToCount(ex=e, q=q) for e, q in zip(exercices, [c.quantity for c in exos])] exercices.sort(key=lambda e: exos_id_list.index(e.id_code)) return [Exercices(exercice_id=e.id_code, name=e.name, quantity=[ex for ex in exos if ex.exercice_id == e.id_code][0].quantity, examples=e.examples).dict() for e in exercices] def create_parcours_db(parcours: ParcoursCreate, room_id: int, db: Session): exercices = validate_exercices(parcours.exercices, db) if len(exercices) == 0: return "Veuillez entrer au moins un exercice valide" id_code = generate_unique_code(Parcours, s=db) parcours_obj = Parcours( **{**parcours.dict(), "exercices": exercices}, room_id=room_id, id_code=id_code) db.add(parcours_obj) db.commit() db.refresh(parcours_obj) return parcours_obj def deleteParcoursRelated(parcours: Parcours, db: Session): db.exec(delete(Challenge).where(Challenge.parcours_id == parcours.id_code)) db.exec(delete(TmpCorrection).where( TmpCorrection.parcours_id == parcours.id_code)) db.exec(delete(Challenger).where(Challenger.parcours_id == parcours.id)) db.commit() def change_challengers_validation(p: Parcours, db: Session): stmt = update(Challenger).values( validated=select(Challenge.id).where(Challenge.challenger_mid == Challenger.member_id, Challenge.challenger_pid == Challenger.parcours_id, Challenge.validated == 1).exists()).where( Challenger.parcours_id == p.id) db.execute(stmt) db.commit() return def change_challenges_validation(p: Parcours, db: Session): challenges = db.exec(select(Challenge).where( Challenge.parcours_id == p.id_code)).all() challs = [] for c in challenges: validated = c.time <= p.time * 60 and c.mistakes <= p.max_mistakes if validated != c.validated: c.validated = validated challs.append(c) db.bulk_save_objects(challs) db.commit() def changeValidation(p: Parcours, db: Session): change_challenges_validation(p, db) change_challengers_validation(p, db) def compareExercices(old: list[Exercices], new: list[ExercicesCreate]): old = [{"id": o['exercice_id'], "q": o['quantity']} for o in old] new = [{"id": n.exercice_id, "q": n.quantity} for n in new] return old == new def update_parcours_db(parcours: ParcoursCreate, parcours_obj: Parcours, db: Session): update_challenges = False if not compareExercices(parcours_obj.exercices, parcours.exercices): exercices = validate_exercices(parcours.exercices, db) if len(exercices) == 0: return "Veuillez entrer au moins un exercice valide" deleteParcoursRelated(parcours_obj, db) update_challenges = True parcours_obj.exercices = exercices update_validated = parcours_obj.max_mistakes != parcours.max_mistakes or parcours_obj.time != parcours.time parcours_obj.name = parcours.name parcours_obj.time = parcours.time parcours_obj.max_mistakes = parcours.max_mistakes db.add(parcours_obj) db.commit() db.refresh(parcours_obj) if update_validated: changeValidation(parcours_obj, db) return parcours_obj, update_challenges def delete_parcours_db(parcours: Parcours, db: Session): db.delete(parcours) db.commit() return True class CorrigedChallenge(BaseModel): data: List[List[CorrigedGeneratorOut]] mistakes: int isCorriged: bool def create_tmp_correction(data: List[CorrigedData], parcours_id: str, member: Member, db: Session): code = generate_unique_code(TmpCorrection, s=db) tmpCorr = TmpCorrection(data=data, id_code=code, member=member, parcours_id=parcours_id) db.add(tmpCorr) db.commit() db.refresh(tmpCorr) return tmpCorr def validate_challenge_input(obj: List[CorrectionData], corr: TmpCorrection): data = corr.data if len(obj) != len(data): return False for i in range(len(data)): exo_corr = data[i] exo = obj[i] if len(exo.data) != len(exo_corr['data']): return zipped = zip(exo_corr['data'], exo.data) same = all([e['calcul'] == f.calcul and len(e['inputs']) == len(f.inputs) for e, f in zipped]) if not same: return False return True def validate_challenge_correction(obj: List[CorrigedData], chall: Challenge): data = chall.data if len(obj) != len(data): return False for i in range(len(data)): exo_corr = data[i] exo = obj[i] if len(exo.data) != len(exo_corr['data']): return zipped = zip(exo_corr['data'], exo.data) same = all([e['calcul'] == f.calcul and len(e['inputs']) == len(f.inputs) for e, f in zipped]) if not same: return False return True def corrige_challenge(obj: List[List[ParsedGeneratorOut]], corr: TmpCorrection) -> CorrigedChallenge: if validate_challenge_input(obj, corr) is False: return None data = corr.data note = 0 total = 0 isCorriged = True mistakes = 0 for i in range(len(data)): exo_corr = data[i]["data"] exo = obj[i].data if len(exo) != len(exo_corr): return zipped = zip(exo_corr, exo) for e, f in zipped: for k, l in zip(e['inputs'], f.inputs): k["value"] = str(l.value) total += 1 if k['correction'] is None: isCorriged = False k['valid'] = None elif str(k["correction"]) == str(l.value): k['valid'] = True note += 1 else: k['valid'] = False mistakes += 1 return {"data": data, "mistakes": mistakes, "isCorriged": isCorriged} return {"data": data, "mistakes": mistakes, "note": {"value": note, "total": total}, "isCorriged": isCorriged} def change_correction(obj: List[CorrigedData], chall: Challenge) -> CorrigedChallenge: if validate_challenge_correction(obj, chall) is False: return None data = deepcopy(chall.data) note = 0 total = 0 isCorriged = True mistakes = 0 for i in range(len(data)): exo_corr = data[i]['data'] exo = obj[i].data if len(exo) != len(exo_corr): return zipped = zip(exo_corr, exo) for e, f in zipped: for k, l in zip(e['inputs'], f.inputs): k["correction"] = l.correction k["valid"] = l.valid total += 1 if k['correction'] is None and l.valid is None: isCorriged = False if l.valid is True: note += 1 else: mistakes += 1 return {"data": data, "mistakes": mistakes, "isCorriged": isCorriged} return {"data": data, "note": {"value": note, "total": total}, "isCorriged": isCorriged} def getChallenger(parcours: Parcours, member: Member, db: Session): challenger = db.exec(select(Challenger).where( Challenger.member_id == member.id, Challenger.parcours_id == parcours.id)).first() if challenger is None: return Challenger(parcours_id=parcours.id, member_id=member.id) return challenger def ChallengerFromChallenge(c: Challenge, db: Session): challenger = db.exec(select(Challenger).where( Challenger.member_id == c.challenger_mid, Challenger.parcours_id == c.challenger_pid)).first() return challenger def checkValidated(challenger: Challenger, db: Session, challenge: Challenge | None = None): challenges = db.exec(select(Challenge).where(Challenge.challenger_mid == challenger.member_id, Challenge.challenger_pid == challenger.parcours_id, Challenge.validated == True)).all() if challenge is not None: challenges = [c for c in challenges if c.id != challenge.id] return len(challenges) != 0 def create_challenge(data: List[CorrigedData], challenger: Member, parcours: Parcours, time: int, mistakes: int, isCorriged: bool, db: Session): challenger_obj: Challenger = getChallenger(parcours, challenger, db) validated = time <= parcours.time * 60 and mistakes <= parcours.max_mistakes challenge = Challenge(data=data, challenger_pid=challenger_obj.parcours_id, challenger_mid=challenger_obj.member_id, parcours=parcours, time=time, mistakes=mistakes, isCorriged=isCorriged, id_code=generate_unique_code(Challenge, s=db), validated=validated) if (challenger_obj.best is not None and challenger_obj.best > mistakes) or challenger_obj.best is None: challenger_obj.best = mistakes challenger_obj.best_time = time challenges = db.exec(select([func.count(Challenge.id)]).where( Challenge.challenger_mid == challenger_obj.member_id, Challenge.challenger_pid == parcours.id)).one() if validated and challenger_obj.validated is False: challenger_obj.validated = True avg = challenger_obj.avg if avg is None: avg = 0 challenger_obj.avg = (avg * (challenges - 1) + mistakes) / (challenges) db.add(challenge) db.add(challenger_obj) db.commit() db.refresh(challenge) db.refresh(challenger_obj) return challenge, challenger_obj def change_challenge(challenge: Challenge, corriged: CorrigedChallenge, db: Session): challenger = ChallengerFromChallenge(challenge, db) challengesCount = len(getChallenges(challenger, db)) avg = challenger.avg * challengesCount - challenge.mistakes parcours = challenge.parcours if challenger.best > corriged['mistakes']: challenger.best = corriged['mistakes'] challenger.best_time = challenge.time validated = challenge.time <= parcours.time * 60 and corriged['mistakes'] <= parcours.max_mistakes challenge.validated = validated if challenger.validated == False and validated: challenger.validated = True elif challenger.validated == True and not validated: challenger.validated = checkValidated(challenger, db, challenge) challenger.avg = (avg + corriged['mistakes']) / challengesCount challenge.data = corriged['data'] challenge.mistakes = corriged['mistakes'] challenge.isCorriged = corriged['isCorriged'] # challenge.validated = corriged['mistakes'] <= parcours.max_mistakes db.add(challenge) db.add(challenger) db.commit() db.refresh(challenge) db.refresh(challenger) return challenge, challenger # Dependencies def check_room(room_id: str, db: Session = Depends(get_session)): room = db.exec(select(Room).where(Room.id_code == room_id)).first() return room def get_room(room_id, db: Session = Depends(get_session)): room = db.exec(select(Room).where(Room.id_code == room_id)).first() if room is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Salle introuvable") return room def get_member_dep(room: Room = Depends(get_room), user: User = Depends(get_current_user_optional), clientId: str | None = Query(default=None), db: Session = Depends(get_session)): if user is None and clientId is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") member = None if user is not None: member = get_member_from_user(user.id, room.id, db) elif clientId is not None: member = get_member_from_clientId(clientId, room.id, db) if member is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Vous n'ĂȘtes pas dans cette salle") return member def check_admin(member: Member = Depends(get_member_dep)): if member.is_admin is False: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Vous devez ĂȘtre administrateur pour faire cela") return member def get_parcours(parcours_id: str, room: Room = Depends(get_room), db: Session = Depends(get_session)): room = db.exec(select(Parcours).where(Parcours.id_code == parcours_id, Parcours.room_id == room.id)).first() if room is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Parcours introuvable") return room def get_exercices(parcours: Parcours = Depends(get_parcours), db: Session = Depends(get_session)): exercices = db.exec(select(Exercice).where(col(Exercice.id_code).in_( [e['exercice_id'] for e in parcours.exercices]))).all() return [{"exercice": e, "quantity": [q for q in parcours.exercices if q['exercice_id'] == e.id_code][0]['quantity']} for e in exercices] def get_correction(correction_id: str, parcours_id: str, member: Member = Depends(get_member_dep), db: Session = Depends(get_session)): tmpCorr = db.exec(select(TmpCorrection).where( TmpCorrection.id_code == correction_id, TmpCorrection.parcours_id == parcours_id)).first() if tmpCorr is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Correction introuvable") if member != tmpCorr.member: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="It's not your challenge") return tmpCorr def get_challenge(challenge_id: str, db: Session = Depends(get_session)): challenge = db.exec(select(Challenge).where( Challenge.id_code == challenge_id)).first() if challenge is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Challenge introuvable") if challenge.data == []: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Impossible de corriger ce challenge") return challenge