from services.auth import get_current_user_optional from services.misc import noteOn20 from copy import deepcopy from typing import Dict, List import uuid from fastapi import Body, Depends, HTTPException, status from pydantic import BaseModel from sqlmodel import Session, delete, select, col, table from database.db import get_session from database.room.models import Anonymous, Challenge, Challenges, CorrigedGeneratorOut, Exercices, ExercicesCreate, Member, Note, Parcours, ParcoursCreate, ParcoursReadShort, ParsedGeneratorOut, Room, RoomCreate, RoomInfo, RoomRead, TmpCorrection, Waiter, MemberRead from database.auth.models import User from services.database import generate_unique_code from database.auth.crud import get_user_from_token from database.exercices.models import Exercice def create_room_db(*, room: RoomCreate, user: User | None = None, username: str | None = None, db: Session): id_code = generate_unique_code(Room, s=db) member_id = generate_unique_code(Member, s=db) room_obj = Room(**room.dict(exclude_unset=True), id_code=id_code) if user is not None: member = Member(user_id=user.id, room=room_obj, is_admin=True, id_code=member_id) db.add(member) db.commit() db.refresh(member) if username is not None: reconnect_code = generate_unique_code( Anonymous, s=db, field_name='reconnect_code') anonymous = Anonymous(username=username, reconnect_code=reconnect_code) member = Member(anonymous=anonymous, room=room_obj, is_admin=True, id_code=member_id) db.add(member) db.commit() db.refresh(member) if username is None and user is None: raise ValueError('Username or user required') return {"room": room_obj, "member": member} def change_room_name(room: Room, name: str, db: Session): room.name = name db.add(room) db.commit() db.refresh(room) return room def change_room_status(room: Room, public: bool, db: Session): room.public = public db.add(room) db.commit() db.refresh(room) return room def get_member_from_user(user_id: int, room_id: int, db: Session): member = db.exec(select(Member).where(Member.room_id == room_id, Member.user_id == user_id)).first() return member def get_member_from_token(token: str, room_id: int, db: Session): user = get_user_from_token(token, db) if user is False: return False if user is None: return None member = get_member_from_user(user.id, room_id, db) return member def get_member_from_anonymous(anonymous_id: int, room_id: int, db: Session): member = db.exec(select(Member).where(Member.room_id == room_id, Member.anonymous_id == anonymous_id)).first() return member def get_member_from_reconnect_code(reconnect_code: str, room_id: int, db: Session): anonymous = get_anonymous_from_code(reconnect_code, db) if anonymous is None: return None member = get_member_from_anonymous(anonymous.id, room_id, db) return member def get_anonymous_from_code(reconnect_code: str, db: Session): anonymous = db.exec(select(Anonymous).where( Anonymous.reconnect_code == reconnect_code)).first() return anonymous def get_anonymous_from_clientId(clientId: str, db: Session): anonymous = db.exec(select(Anonymous).where( Anonymous.clientId == clientId)).first() return anonymous def get_member_from_clientId(clientId: str, room_id: int, db: Session): anonymous = get_anonymous_from_clientId(clientId, db) if anonymous is None: return None member = get_member_from_anonymous(anonymous.id, room_id, db) return member def create_member(*, room: Room, user: User | None = None, anonymous: Anonymous | None = None, waiting: bool = False, db: Session): member_id = generate_unique_code(Member, s=db) member = Member(room=room, user=user, anonymous=anonymous, waiting=waiting, id_code=member_id) db.add(member) db.commit() db.refresh(member) return member def get_or_create_member(*, room: Room, user: User | None = None, anonymous: Anonymous | None = None, waiting: bool = False, db: Session): member = user is not None and get_member_from_user(user.id, room.id, db) if member is not None and member is not False: return member member= create_member(room=room, user=user, anonymous=anonymous, waiting=waiting, db=db) def connect_member(member: Member, db: Session): member.online = True db.add(member) db.commit() db.refresh(member) return member def disconnect_member(member: Member, db: Session): if member.waiting == False: member.online = False if member.anonymous is not None: change_anonymous_clientId(member.anonymous,db) db.add(member) db.commit() db.refresh(member) return member else: db.delete(member) db.commit() return member def validate_username(username: str, room: Room, db: Session = Depends(get_session)): if len(username) > 20: return None members = select(Member.anonymous_id).where( Member.room_id == room.id, Member.anonymous_id != None) anonymous = select(Anonymous).where( col(Anonymous.id).in_(members), Anonymous.username == username) username_anonymous = db.exec(anonymous).first() return None if username_anonymous is not None else username def create_anonymous_member(username: str, room: Room, db: Session): username = validate_username(username, room, db) if username is None: return None reconnect_code = generate_unique_code( Anonymous, s=db, field_name="reconnect_code") anonymous = Anonymous(username=username, reconnect_code=reconnect_code) member_id = generate_unique_code(Member, s=db) member = Member(room=room, anonymous=anonymous, id_code=member_id) db.add(member) db.commit() db.refresh(member) return member def create_anonymous(username: str, room: Room, db: Session): username = validate_username(username, room, db) if username is None: return None reconnect_code = generate_unique_code( Anonymous, s=db, field_name="reconnect_code") anonymous = Anonymous(username=username, reconnect_code=reconnect_code) db.add(anonymous) db.commit() db.refresh(anonymous) return anonymous def check_user_in_room(user_id: int, room_id: int, db: Session): user = db.exec(select(Member).where(Member.user_id==user_id, Member.room_id == room_id)).first() return user def create_user_member(user: User, room: Room, db: Session): member = get_member_from_user(user.id, room.id, db) if member is not None: return None member_id = generate_unique_code(Member, s=db) member = Member(room=room, user=user, id_code=member_id) db.add(member) db.commit() db.refresh(member) return member def create_anonymous_waiter(username: str, room: Room, db: Session): username = validate_username(username, room, db) if username is None: return None reconnect_code = generate_unique_code( Anonymous, s=db, field_name="reconnect_code") anonymous = Anonymous(username=username, reconnect_code=reconnect_code) member_id = generate_unique_code(Member, s=db) member = Member(room=room, anonymous=anonymous, waiting=True, id_code=member_id) db.add(member) db.commit() db.refresh(member) return member def create_user_waiter(user: User, room: Room, db: Session): member = get_member_from_user(user.id, room.id, db) if member is not None: return member member = create_member(room=room, user=user, waiting=True, db=db) return member def get_waiter(waiter_code: str, db: Session): return db.exec(select(Member).where(Member.id_code == waiter_code, Member.waiting == True)).first() def get_member(id_code: str, room_id: str, db: Session): return db.exec(select(Member).where(Member.id_code == id_code, Member.room_id == room_id)).first() def delete_member(member: Member, db: Session): db.delete(member) db.commit() return None def accept_waiter(member: Member, db: Session): member.waiting = False member.waiter_code = None db.add(member) db.commit() db.refresh(member) return member def refuse_waiter(member: Member, db: Session): db.delete(member) db.commit() return None def leave_room(member: Member, db: Session): db.delete(member) db.commit() return None def serialize_member(member: Member) -> MemberRead | Waiter: member_obj = member.user or member.anonymous if member.waiting == False: return MemberRead(username=member_obj.username, reconnect_code=getattr(member_obj, "reconnect_code", ""), isUser=member.user_id != None, isAdmin=member.is_admin, id_code=member.id_code).dict() if member.waiting == True: return Waiter(username=member_obj.username, waiter_id=member.id_code).dict() def serialize_parcours_short(parcours: Parcours, member: Member, db: Session): best_note = db.exec(select(Challenge.note, Challenge.time).where(Challenge.parcours_id == parcours.id, Challenge.challenger_id == member.id).order_by(col(Challenge.note).desc()).limit(1)).first() note = None if best_note is not None: best_note=best_note[0] note = Note(note=best_note[0], time=best_note[1]) return ParcoursReadShort(**parcours.dict(exclude_unset=True), best_note=note) def serialize_challenge(challenge: Challenge): return Challenges(name=challenge.challenger.user.username if challenge.challenger.user is not None else challenge.challenger.anonymous.username, value=Note(note=challenge.note, time=challenge.time), isCorriged=challenge.isCorriged, canCorrige=challenge.data is not None) def serialize_parcours_short(parcours: Parcours, member: Member, db: Session): if member.is_member == False: challenges = db.exec(select(Challenge).where(Challenge.parcours_id == parcours.id, Challenge.challenger_id == member.id)).all() else: challenges = db.exec(select(Challenge).where( Challenge.parcours_id == parcours.id)).all() challenges = [serialize_challenge(c) for c in challenges] return Parcours(**parcours.dict(), challenges=challenges) def serialize_room(room: Room, member: Member, db: Session): return RoomInfo(**room.dict(), parcours=[serialize_parcours_short(p, member, db) for p in room.parcours], members=[serialize_member(m) for m in room.members]) def change_anonymous_clientId(anonymous: Anonymous, db: Session): anonymous.clientId = uuid.uuid4() db.add(anonymous) db.commit() db.refresh(anonymous) return anonymous #Parcours def validate_exercices(exos: List[ExercicesCreate], db: Session ): exercices = db.exec(select(Exercice).where(Exercice.web == True).where(col(Exercice.id_code).in_([e.exercice_id for e in exos]))).all() exos_id_list = [e.exercice_id for e in exos] exercices.sort(key=lambda e: exos_id_list.index(e.id_code)) return [Exercices(exercice_id=e.id_code, name=e.name, quantity=[ex for ex in exos if ex.exercice_id == e.id_code][0].quantity).dict() for e in exercices] def create_parcours_db(parcours: ParcoursCreate,room_id: int, db: Session): exercices = validate_exercices(parcours.exercices, db) if len(exercices) == 0: return "Veuillez entrer au moins un exercice valide" id_code = generate_unique_code(Parcours, s=db) parcours_obj = Parcours(**{**parcours.dict(), "exercices": exercices}, room_id=room_id, id_code=id_code) print(parcours_obj) db.add(parcours_obj) db.commit() db.refresh(parcours_obj) return parcours_obj def deleteParcoursRelated(parcours: Parcours, db: Session): db.exec(delete(Challenge).where(Challenge.parcours_id == parcours.id_code)) db.exec(delete(TmpCorrection).where(TmpCorrection.parcours_id == parcours.id_code)) db.commit() def update_parcours_db(parcours: ParcoursCreate, parcours_obj: Parcours, db: Session): exercices = validate_exercices(parcours.exercices, db) if len(exercices) == 0: return "Veuillez entrer au moins un exercice valide" parcours_data = parcours.dict(exclude_unset=True) for key, value in parcours_data.items(): setattr(parcours_obj, key, value) parcours_obj.exercices = exercices db.add(parcours_obj) db.commit() deleteParcoursRelated(parcours_obj, db) db.refresh(parcours_obj) return parcours_obj def delete_parcours_db(parcours: Parcours, db: Session): db.delete(parcours) db.commit() return True class CorrigedChallenge(BaseModel): data: List[List[CorrigedGeneratorOut]] note: Note isCorriged: bool def create_tmp_correction(data: List[List[CorrigedGeneratorOut]], parcours_id: str, member: Member, db: Session): code = generate_unique_code(TmpCorrection, s=db) tmpCorr = TmpCorrection(data=data, id_code=code, member=member, parcours_id=parcours_id) db.add(tmpCorr) db.commit() db.refresh(tmpCorr) return tmpCorr def change_challenge(challenge: Challenge, corriged: CorrigedChallenge, db: Session): challenge.data = corriged['data'] challenge.note = corriged['note'] challenge.isCorriged = corriged['isCorriged'] challenge.validated = noteOn20( corriged['note']['value'], corriged['note']['total']) > challenge.parcours.validate_condition db.add(challenge) db.commit() db.refresh(challenge) return challenge def validate_challenge_input(obj: List[List[ParsedGeneratorOut]], corr: TmpCorrection): data = corr.data if len(obj) != len(data): return False for i in range(len(data)): exo_corr = data[i] exo = obj[i] if len(exo) != len(exo_corr): return zipped = zip(exo_corr, exo) same = all([e['calcul'] == f.calcul and len(e['inputs']) == len(f.inputs) for e,f in zipped]) if not same: return False return True def validate_challenge_correction(obj: List[List[CorrigedGeneratorOut]], chall: Challenge): data = chall.data if len(obj) != len(data): return False for i in range(len(data)): exo_corr = data[i] exo = obj[i] if len(exo) != len(exo_corr): return zipped = zip(exo_corr, exo) same = all([e['calcul'] == f.calcul and len(e['inputs']) == len(f.inputs) for e,f in zipped]) if not same: return False return True def corrige_challenge(obj: List[List[ParsedGeneratorOut]], corr: TmpCorrection) -> CorrigedChallenge: if validate_challenge_input(obj , corr) is False: return None data = corr.data note = 0 total = 0 isCorriged = True for i in range(len(data)): exo_corr = data[i] exo = obj[i] if len(exo) != len(exo_corr): return zipped = zip(exo_corr, exo) for e, f in zipped: for k, l in zip(e['inputs'], f.inputs): k["value"] = str(l.value) total += 1 if k['correction'] is None: isCorriged = False if str(k["correction"]) == str(l.value): note += 1 return {"data": data, "note": {"value": 1, "total": 3}, "isCorriged": isCorriged} def change_correction(obj: List[List[CorrigedGeneratorOut]], chall: Challenge) -> CorrigedChallenge: if validate_challenge_correction(obj, chall) is False: return None data = deepcopy(chall.data) note = 0 total = 0 isCorriged = True for i in range(len(data)): exo_corr = data[i] exo = obj[i] if len(exo) != len(exo_corr): return zipped = zip(exo_corr, exo) for e, f in zipped: for k, l in zip(e['inputs'], f.inputs): k["correction"] = str(l.correction) total += 1 if k['correction'] is None: isCorriged = False if str(k["correction"]) == str(l.value): note += 1 return {"data": data, "note": {"value": note, "total": total}, "isCorriged": isCorriged} def create_challenge(data: List[List[CorrigedGeneratorOut]], challenger: Member, parcours: Parcours, time: int, note: Note, isCorriged: bool, db: Session): challenge = Challenge(data=data, challenger=challenger, parcours=parcours, time=time, note=note, validated=noteOn20(note["value"], note['total']) >= parcours.validate_condition, isCorriged=isCorriged, id_code=generate_unique_code(Challenge, s=db)) db.add(challenge) db.commit() db.refresh(challenge) return challenge def change_challenge(challenge: Challenge, corriged: CorrigedChallenge, db: Session): challenge.data = corriged['data'] challenge.note = corriged['note'] challenge.isCorriged = corriged['isCorriged'] challenge.validated = noteOn20( corriged['note']['value'], corriged['note']['total']) > challenge.parcours.validate_condition db.add(challenge) db.commit() db.refresh(challenge) return challenge # Dependencies def check_room(room_id: str, db: Session = Depends(get_session)): room = db.exec(select(Room).where(Room.id_code == room_id)).first() return room def get_room(room_id, db: Session = Depends(get_session)): room = db.exec(select(Room).where(Room.id_code == room_id)).first() if room is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Salle introuvable") return room def get_member_dep(room: Room = Depends(get_room), user: User = Depends(get_current_user_optional), clientId: str | None = Body(default=None), db: Session = Depends(get_session)): if user is None and clientId is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") if user is not None: member = get_member_from_user(user.id, room.id, db) if clientId is not None: member = get_member_from_clientId(clientId, room.id, db) if member is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Vous n'ĂȘtes pas dans cette salle") return member def check_admin(member: Member = Depends(get_member_dep)): if member.is_admin is False: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Vous devez ĂȘtre administrateur pour faire cela") return member def get_parcours(parcours_id: str, room: Room = Depends(get_room), db: Session = Depends(get_session)): room = db.exec(select(Parcours).where(Parcours.id_code == parcours_id, Parcours.room_id == room.id)).first() if room is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Parcours introuvable") return room def get_exercices(parcours: Parcours = Depends(get_parcours), db: Session = Depends(get_session)): exercices = db.exec(select(Exercice).where(col(Exercice.id_code).in_( [e['exercice_id'] for e in parcours.exercices]))).all() return [{"exercice": e, "quantity": [q for q in parcours.exercices if q['exercice_id'] == e.id_code][0]['quantity']} for e in exercices] def get_correction(correction_id: str, parcours_id: str, member: Member = Depends(get_member_dep), db: Session = Depends(get_session)): tmpCorr = db.exec(select(TmpCorrection).where( TmpCorrection.id_code == correction_id, TmpCorrection.parcours_id == parcours_id)).first() if tmpCorr is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Correction introuvable") if member != tmpCorr.member: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="It's not your challenge") return tmpCorr def get_challenge(challenge_id: str, parcours_id: str, db: Session = Depends(get_session)): challenge = db.exec(select(Challenge).where( Challenge.id_code == challenge_id, Challenge.parcours_id == parcours_id)).first() if challenge is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Challenge introuvable") if challenge.data == []: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Impossible de corriger ce challenge") return challenge