generateur_v3/backend/api/database/room/crud.py

854 lines
30 KiB
Python

import uuid
from copy import deepcopy
from typing import List
from fastapi import Depends, HTTPException, status, Query
from pydantic import BaseModel
from sqlalchemy import func
from sqlmodel import Session, delete, select, col, update
from database.auth.crud import get_user_from_token
from database.auth.models import User
from database.db import get_session
from database.exercices.models import Exercice
from database.room.models import Anonymous, Challenge, Challenges, CorrigedGeneratorOut, Exercices, ExercicesCreate, \
Member, Note, Parcours, ParcoursCreate, ParcoursReadShort, ParsedGeneratorOut, Room, RoomCreate, RoomInfo, \
TmpCorrection, Waiter, MemberRead, CorrigedData, CorrectionData, Challenger
from services.auth import get_current_user_optional
from services.database import generate_unique_code
def create_room_db(*, room: RoomCreate, user: User | None = None, username: str | None = None, db: Session):
id_code = generate_unique_code(Room, s=db)
member_id = generate_unique_code(Member, s=db)
room_obj = Room(**room.dict(exclude_unset=True), id_code=id_code)
if user is not None:
member = Member(user_id=user.id, room=room_obj,
is_admin=True, id_code=member_id)
db.add(member)
db.commit()
db.refresh(member)
if username is not None:
reconnect_code = generate_unique_code(
Anonymous, s=db, field_name='reconnect_code')
anonymous = Anonymous(username=username, reconnect_code=reconnect_code)
member = Member(anonymous=anonymous, room=room_obj,
is_admin=True, id_code=member_id)
db.add(member)
db.commit()
db.refresh(member)
if username is None and user is None:
raise ValueError('Username or user required')
return {"room": room_obj, "member": member}
def change_room_name(room: Room, name: str, db: Session):
room.name = name
db.add(room)
db.commit()
db.refresh(room)
return room
def change_room_status(room: Room, public: bool, db: Session):
room.public = public
db.add(room)
db.commit()
db.refresh(room)
return room
def delete_room_db(room: Room, db: Session):
db.delete(room)
db.commit()
return True
def get_member_from_user(user_id: int, room_id: int, db: Session):
member = db.exec(select(Member).where(Member.room_id ==
room_id, Member.user_id == user_id)).first()
return member
def get_member_from_token(token: str, room_id: int, db: Session):
user = get_user_from_token(token, db)
if user is False:
return False
if user is None:
return None
member = get_member_from_user(user.id, room_id, db)
return member
def get_member_from_anonymous(anonymous_id: int, room_id: int, db: Session):
member = db.exec(select(Member).where(Member.room_id ==
room_id, Member.anonymous_id == anonymous_id)).first()
return member
def get_member_from_reconnect_code(reconnect_code: str, room_id: int, db: Session):
anonymous = get_anonymous_from_code(reconnect_code, db)
if anonymous is None:
return None
member = get_member_from_anonymous(anonymous.id, room_id, db)
return member
def get_anonymous_from_code(reconnect_code: str, db: Session):
anonymous = db.exec(select(Anonymous).where(
Anonymous.reconnect_code == reconnect_code)).first()
return anonymous
def get_anonymous_from_clientId(clientId: str, db: Session):
anonymous = db.exec(select(Anonymous).where(
Anonymous.clientId == clientId)).first()
return anonymous
def get_member_from_clientId(clientId: str, room_id: int, db: Session):
anonymous = get_anonymous_from_clientId(clientId, db)
if anonymous is None:
return None
member = get_member_from_anonymous(anonymous.id, room_id, db)
return member
def create_member(*, room: Room, user: User | None = None, anonymous: Anonymous | None = None, waiting: bool = False,
db: Session):
member_id = generate_unique_code(Member, s=db)
member = Member(room=room, user=user, anonymous=anonymous, waiting=waiting,
id_code=member_id)
member.online = True
db.add(member)
db.commit()
db.refresh(member)
return member
def get_or_create_member(*, room: Room, user: User | None = None, anonymous: Anonymous | None = None,
waiting: bool = False, db: Session):
member = user is not None and get_member_from_user(user.id, room.id, db)
if member is not None and member is not False:
return member
member = create_member(room=room, user=user,
anonymous=anonymous, waiting=waiting, db=db)
def connect_member(member: Member, db: Session):
member.online = True
db.add(member)
db.commit()
db.refresh(member)
return member
def disconnect_member(member: Member, db: Session):
if member.waiting == False:
member.online = False
if member.anonymous is not None:
change_anonymous_clientId(member.anonymous, db)
db.add(member)
db.commit()
db.refresh(member)
return member
else:
db.delete(member)
db.commit()
return member
def validate_username(username: str, room: Room, db: Session = Depends(get_session)):
if len(username) > 20:
return None
members = select(Member.anonymous_id).where(
Member.room_id == room.id, Member.anonymous_id != None)
anonymous = select(Anonymous).where(
col(Anonymous.id).in_(members), Anonymous.username == username)
username_anonymous = db.exec(anonymous).first()
return None if username_anonymous is not None else username
def create_anonymous_member(username: str, room: Room, db: Session):
username = validate_username(username, room, db)
if username is None:
return None
reconnect_code = generate_unique_code(
Anonymous, s=db, field_name="reconnect_code")
anonymous = Anonymous(username=username, reconnect_code=reconnect_code)
member_id = generate_unique_code(Member, s=db)
member = Member(room=room, anonymous=anonymous, id_code=member_id)
db.add(member)
db.commit()
db.refresh(member)
return member
def create_anonymous(username: str, room: Room, db: Session):
username = validate_username(username, room, db)
if username is None:
return None
reconnect_code = generate_unique_code(
Anonymous, s=db, field_name="reconnect_code")
anonymous = Anonymous(username=username, reconnect_code=reconnect_code)
db.add(anonymous)
db.commit()
db.refresh(anonymous)
return anonymous
def check_user_in_room(user_id: int, room_id: int, db: Session):
user = db.exec(select(Member).where(Member.user_id ==
user_id, Member.room_id == room_id)).first()
return user
def create_user_member(user: User, room: Room, db: Session):
member = get_member_from_user(user.id, room.id, db)
if member is not None:
return None
member_id = generate_unique_code(Member, s=db)
member = Member(room=room, user=user, id_code=member_id)
db.add(member)
db.commit()
db.refresh(member)
return member
def create_anonymous_waiter(username: str, room: Room, db: Session):
username = validate_username(username, room, db)
if username is None:
return None
reconnect_code = generate_unique_code(
Anonymous, s=db, field_name="reconnect_code")
anonymous = Anonymous(username=username, reconnect_code=reconnect_code)
member_id = generate_unique_code(Member, s=db)
member = Member(room=room, anonymous=anonymous,
waiting=True, id_code=member_id)
db.add(member)
db.commit()
db.refresh(member)
return member
def create_user_waiter(user: User, room: Room, db: Session):
member = get_member_from_user(user.id, room.id, db)
if member is not None:
return member
member = create_member(room=room, user=user, waiting=True,
db=db)
return member
def get_waiter(waiter_code: str, db: Session):
return db.exec(select(Member).where(Member.id_code == waiter_code, Member.waiting == True)).first()
def get_member(id_code: str, room_id: str, db: Session):
return db.exec(select(Member).where(Member.id_code == id_code, Member.room_id == room_id)).first()
def delete_member(member: Member, db: Session):
db.delete(member)
db.commit()
return None
def accept_waiter(member: Member, db: Session):
member.waiting = False
member.waiter_code = None
member.online = True
db.add(member)
db.commit()
db.refresh(member)
return member
def refuse_waiter(member: Member, db: Session):
db.delete(member)
db.commit()
return None
def leave_room(member: Member, db: Session):
# db.execute(delete(Challenger).where(col(Challenger.member_id) == member.id))
# db.execute(delete(TmpCorrection).where(col(TmpCorrection.member_id) == member.id))
db.delete(member)
db.commit()
return None
def serialize_member(member: Member, private: bool = False, admin: bool = False,
m2: Member | None = None) -> MemberRead | Waiter:
member_obj = member.user or member.anonymous
if not member.waiting:
return MemberRead(username=member_obj.username, online=member.online,
clientId=str(member_obj.clientId) if (private == True and member.user_id == None) else "",
reconnect_code=getattr(member_obj, "reconnect_code", "") if (admin or m2 == member) else "",
isUser=member.user_id != None, isAdmin=member.is_admin, id_code=member.id_code).dict()
if member.waiting:
return Waiter(username=member_obj.username, waiter_id=member.id_code).dict()
def serialize_parcours_short(parcours: Parcours, member: Member, db: Session):
challenger = getChallenger(parcours, member, db)
return ParcoursReadShort(name=parcours.name, id_code=parcours.id_code, best_note=challenger.best,
validated=challenger.validated, avg=challenger.avg)
def serialize_challenge(challenge: Challenge):
return Challenges(
name=challenge.challenger.user.username if challenge.challenger.user is not None else challenge.challenger.anonymous.username,
value=Note(note=challenge.note, time=challenge.time), isCorriged=challenge.isCorriged,
canCorrige=challenge.data is not None)
def serialize_room(room: Room, member: Member, db: Session):
return RoomInfo(**room.dict(), parcours=[serialize_parcours_short(p, member, db) for p in room.parcours],
members=[serialize_member(m, admin=member.is_admin, m2=member) for m in room.members])
def getUsername(m: Member):
return m.user.username if m.user is not None else m.anonymous.username
def getChallengerInfo(c: Challenge, db: Session):
challenger = db.exec(select(Challenger).where(Challenger.member_id ==
c.challenger_mid, Challenger.parcours_id == c.challenger_pid)).first()
if challenger is not None:
member = challenger.member
return {"name": getUsername(member), "id_code": member.id_code}
def getChallenges(c: Challenger, db: Session):
challenges = db.exec(select(Challenge).where(Challenge.challenger_mid == c.member_id,
Challenge.challenger_pid == c.parcours_id)).all()
return challenges
def getMemberChallenges(m: Member, p: Parcours, db: Session):
challenges = db.exec(select(Challenge).where(Challenge.challenger_mid == m.id,
Challenge.challenger_pid == p.id)).all()
return challenges
def getTops(p: Parcours, db: Session):
tops = db.exec(select(Challenge).where(Challenge.parcours_id == p.id_code).order_by(
col(Challenge.mistakes), col(Challenge.time)).limit(3)).all()
tops = [{"challenger": getChallengerInfo(
t, db), "mistakes": t.mistakes, "time": t.time} for t in tops]
return tops
def getAvgTops(p: Parcours, db: Session):
avgTop = db.exec(select(Challenger).where(Challenger.parcours_id ==
p.id).order_by(col(Challenger.avg)).limit(3)).all()
avgTop = [{"id_code": t.member.id_code, "avg": t.avg,
"name": getUsername(t.member)} for t in avgTop]
return avgTop
def getRank(c: Challenger, p: Parcours, db: Session):
noteRank = db.exec(select([func.count(Challenge.id)]).where(Challenge.parcours_id == p.id_code).order_by(
col(Challenge.mistakes), col(Challenge.time)).where(Challenge.mistakes < c.best,
Challenge.time < c.best_time)).one()
return noteRank + 1
def getAvgRank(c: Challenger, p: Parcours, db: Session):
avgRank = db.exec(select([func.count(Challenger.member_id)]).where(
Challenger.parcours_id == p.id).order_by(col(Challenger.avg)).where(Challenger.avg < c.avg)).one()
return avgRank + 1
def getMemberRank(m: Member, p: Parcours, db: Session):
challenger = db.exec(select(Challenger).where(Challenger.member_id == m.id)).first()
if challenger is None or challenger.best is None:
return None
return getRank(challenger, p, db)
def getMemberAvgRank(m: Member, p: Parcours, db: Session):
challenger = db.exec(select(Challenger).where(Challenger.member_id == m.id)).first()
if challenger is None or challenger.avg is None:
return None
return getAvgRank(challenger, p, db)
def getMemberValidated(m: Member, p: Parcours, db: Session):
challenger = db.exec(select(Challenger).where(Challenger.member_id == m.id)).first()
if challenger is None or challenger.validated is None:
return None
return challenger.validated
def serialize_parcours(parcours: Parcours, member: Member, db: Session):
tops = getTops(parcours, db)
avgTop = getAvgTops(parcours, db)
challenger = db.exec(select(Challenger).where(
Challenger.member_id == member.id, Challenger.parcours_id == parcours.id)).first()
noteRank = None
avgRank = None
pb = None
if challenger is not None and challenger.avg is not None and challenger.best is not None:
noteRank = getRank(challenger, parcours, db)
avgRank = getAvgRank(challenger, parcours, db)
pb = {"mistakes": challenger.best, "time": challenger.best_time}
statement = select(Challenger).where(Challenger.parcours_id == parcours.id)
if not member.is_admin:
statement = statement.where(Challenger.member_id == member.id)
challengers = db.exec(statement).all()
challs = {c.member.id_code: {
"challenger": {"id_code": c.member.id_code, "name": getUsername(c.member), "validated": c.validated},
# 'validated': chall.mistakes <= parcours.max_mistakes
"challenges": [Challenges(**{**chall.dict(), "canCorrige": chall.data != []}) for chall in getChallenges(c, db)]
} for c in challengers}
return {**parcours.dict(), "pb": pb, "tops": tops, "challenges": challs, "rank": noteRank, "memberRank": avgRank,
"validated": challenger.validated if challenger != None else False, "ranking": avgTop}
def change_anonymous_clientId(anonymous: Anonymous, db: Session):
anonymous.clientId = uuid.uuid4()
db.add(anonymous)
db.commit()
db.refresh(anonymous)
return anonymous
# Parcours
from services.io import add_fast_api_root
from generateur.generateur_main import generate_from_path, parseGeneratorOut
def countInput(ex: Exercice, q: int):
exo = parseGeneratorOut(generate_from_path(add_fast_api_root(
ex.exo_source), 1, "web"))
return len(exo.inputs) * q
class ExoToCount(BaseModel):
ex: Exercice
q: int
def getTotal(exs: list[ExoToCount]):
total = 0
for e in exs:
total += countInput(e.ex, e.q)
return total
def validate_exercices(exos: List[ExercicesCreate], db: Session):
exercices = db.exec(select(Exercice).where(Exercice.web == True).where(
col(Exercice.id_code).in_([e.exercice_id for e in exos]))).all()
exos_id_list = [e.exercice_id for e in exos]
# exoToCountList = [ExoToCount(ex=e, q=q) for e, q in zip(exercices, [c.quantity for c in exos])]
exercices.sort(key=lambda e: exos_id_list.index(e.id_code))
return [Exercices(exercice_id=e.id_code, name=e.name,
quantity=[ex for ex in exos if ex.exercice_id == e.id_code][0].quantity,
examples=e.examples).dict() for e in exercices]
def create_parcours_db(parcours: ParcoursCreate, room_id: int, db: Session):
exercices = validate_exercices(parcours.exercices, db)
if len(exercices) == 0:
return "Veuillez entrer au moins un exercice valide"
id_code = generate_unique_code(Parcours, s=db)
parcours_obj = Parcours(
**{**parcours.dict(), "exercices": exercices}, room_id=room_id, id_code=id_code)
db.add(parcours_obj)
db.commit()
db.refresh(parcours_obj)
return parcours_obj
def deleteParcoursRelated(parcours: Parcours, db: Session):
db.exec(delete(Challenge).where(Challenge.parcours_id == parcours.id_code))
db.exec(delete(TmpCorrection).where(
TmpCorrection.parcours_id == parcours.id_code))
db.exec(delete(Challenger).where(Challenger.parcours_id == parcours.id))
db.commit()
def change_challengers_validation(p: Parcours, db: Session):
stmt = update(Challenger).values(
validated=select(Challenge.id).where(Challenge.challenger_mid == Challenger.member_id,
Challenge.challenger_pid == Challenger.parcours_id,
Challenge.validated == 1).exists()).where(
Challenger.parcours_id == p.id)
db.execute(stmt)
db.commit()
return
def change_challenges_validation(p: Parcours, db: Session):
challenges = db.exec(select(Challenge).where(
Challenge.parcours_id == p.id_code)).all()
challs = []
for c in challenges:
validated = c.time <= p.time * 60 and c.mistakes <= p.max_mistakes
if validated != c.validated:
c.validated = validated
challs.append(c)
db.bulk_save_objects(challs)
db.commit()
def changeValidation(p: Parcours, db: Session):
change_challenges_validation(p, db)
change_challengers_validation(p, db)
def compareExercices(old: list[Exercices], new: list[ExercicesCreate]):
old = [{"id": o['exercice_id'], "q": o['quantity']} for o in old]
new = [{"id": n.exercice_id, "q": n.quantity} for n in new]
return old == new
def update_parcours_db(parcours: ParcoursCreate, parcours_obj: Parcours, db: Session):
update_challenges = False
if not compareExercices(parcours_obj.exercices, parcours.exercices):
exercices = validate_exercices(parcours.exercices, db)
if len(exercices) == 0:
return "Veuillez entrer au moins un exercice valide"
deleteParcoursRelated(parcours_obj, db)
update_challenges = True
parcours_obj.exercices = exercices
update_validated = parcours_obj.max_mistakes != parcours.max_mistakes or parcours_obj.time != parcours.time
parcours_obj.name = parcours.name
parcours_obj.time = parcours.time
parcours_obj.max_mistakes = parcours.max_mistakes
db.add(parcours_obj)
db.commit()
db.refresh(parcours_obj)
if update_validated:
changeValidation(parcours_obj, db)
return parcours_obj, update_challenges
def delete_parcours_db(parcours: Parcours, db: Session):
db.delete(parcours)
db.commit()
return True
class CorrigedChallenge(BaseModel):
data: List[List[CorrigedGeneratorOut]]
mistakes: int
isCorriged: bool
def create_tmp_correction(data: List[CorrigedData], parcours_id: str, member: Member, db: Session):
code = generate_unique_code(TmpCorrection, s=db)
tmpCorr = TmpCorrection(data=data, id_code=code,
member=member, parcours_id=parcours_id)
db.add(tmpCorr)
db.commit()
db.refresh(tmpCorr)
return tmpCorr
def validate_challenge_input(obj: List[CorrectionData], corr: TmpCorrection):
data = corr.data
if len(obj) != len(data):
return False
for i in range(len(data)):
exo_corr = data[i]
exo = obj[i]
if len(exo.data) != len(exo_corr['data']):
return
zipped = zip(exo_corr['data'], exo.data)
same = all([e['calcul'] == f.calcul and len(e['inputs'])
== len(f.inputs) for e, f in zipped])
if not same:
return False
return True
def validate_challenge_correction(obj: List[CorrigedData], chall: Challenge):
data = chall.data
if len(obj) != len(data):
return False
for i in range(len(data)):
exo_corr = data[i]
exo = obj[i]
if len(exo.data) != len(exo_corr['data']):
return
zipped = zip(exo_corr['data'], exo.data)
same = all([e['calcul'] == f.calcul and len(e['inputs'])
== len(f.inputs) for e, f in zipped])
if not same:
return False
return True
def corrige_challenge(obj: List[List[ParsedGeneratorOut]], corr: TmpCorrection) -> CorrigedChallenge:
if validate_challenge_input(obj, corr) is False:
return None
data = corr.data
note = 0
total = 0
isCorriged = True
mistakes = 0
for i in range(len(data)):
exo_corr = data[i]["data"]
exo = obj[i].data
if len(exo) != len(exo_corr):
return
zipped = zip(exo_corr, exo)
for e, f in zipped:
for k, l in zip(e['inputs'], f.inputs):
k["value"] = str(l.value)
total += 1
if k['correction'] is None:
isCorriged = False
k['valid'] = None
elif str(k["correction"]) == str(l.value):
k['valid'] = True
note += 1
else:
k['valid'] = False
mistakes += 1
return {"data": data, "mistakes": mistakes, "isCorriged": isCorriged}
return {"data": data, "mistakes": mistakes, "note": {"value": note, "total": total}, "isCorriged": isCorriged}
def change_correction(obj: List[CorrigedData], chall: Challenge) -> CorrigedChallenge:
if validate_challenge_correction(obj, chall) is False:
return None
data = deepcopy(chall.data)
note = 0
total = 0
isCorriged = True
mistakes = 0
for i in range(len(data)):
exo_corr = data[i]['data']
exo = obj[i].data
if len(exo) != len(exo_corr):
return
zipped = zip(exo_corr, exo)
for e, f in zipped:
for k, l in zip(e['inputs'], f.inputs):
k["correction"] = l.correction
k["valid"] = l.valid
total += 1
if k['correction'] is None and l.valid is None:
isCorriged = False
if l.valid is True:
note += 1
else:
mistakes += 1
return {"data": data, "mistakes": mistakes, "isCorriged": isCorriged}
return {"data": data, "note": {"value": note, "total": total}, "isCorriged": isCorriged}
def getChallenger(parcours: Parcours, member: Member, db: Session):
challenger = db.exec(select(Challenger).where(
Challenger.member_id == member.id, Challenger.parcours_id == parcours.id)).first()
if challenger is None:
return Challenger(parcours_id=parcours.id, member_id=member.id)
return challenger
def ChallengerFromChallenge(c: Challenge, db: Session):
challenger = db.exec(select(Challenger).where(
Challenger.member_id == c.challenger_mid, Challenger.parcours_id == c.challenger_pid)).first()
return challenger
def checkValidated(challenger: Challenger, db: Session, challenge: Challenge | None = None):
challenges = db.exec(select(Challenge).where(Challenge.challenger_mid == challenger.member_id,
Challenge.challenger_pid == challenger.parcours_id,
Challenge.validated == True)).all()
if challenge is not None:
challenges = [c for c in challenges if c.id != challenge.id]
return len(challenges) != 0
def create_challenge(data: List[CorrigedData], challenger: Member, parcours: Parcours, time: int, mistakes: int,
isCorriged: bool, db: Session):
challenger_obj: Challenger = getChallenger(parcours, challenger, db)
validated = time <= parcours.time * 60 and mistakes <= parcours.max_mistakes
challenge = Challenge(data=data, challenger_pid=challenger_obj.parcours_id, challenger_mid=challenger_obj.member_id,
parcours=parcours, time=time, mistakes=mistakes, isCorriged=isCorriged,
id_code=generate_unique_code(Challenge, s=db), validated=validated)
if (challenger_obj.best is not None and challenger_obj.best > mistakes) or challenger_obj.best is None:
challenger_obj.best = mistakes
challenger_obj.best_time = time
challenges = db.exec(select([func.count(Challenge.id)]).where(
Challenge.challenger_mid == challenger_obj.member_id, Challenge.challenger_pid == parcours.id)).one()
if validated and challenger_obj.validated is False:
challenger_obj.validated = True
avg = challenger_obj.avg
if avg is None:
avg = 0
challenger_obj.avg = (avg *
(challenges - 1) + mistakes) / (challenges)
db.add(challenge)
db.add(challenger_obj)
db.commit()
db.refresh(challenge)
db.refresh(challenger_obj)
return challenge, challenger_obj
def change_challenge(challenge: Challenge, corriged: CorrigedChallenge, db: Session):
challenger = ChallengerFromChallenge(challenge, db)
challengesCount = len(getChallenges(challenger, db))
avg = challenger.avg * challengesCount - challenge.mistakes
parcours = challenge.parcours
if challenger.best > corriged['mistakes']:
challenger.best = corriged['mistakes']
challenger.best_time = challenge.time
validated = challenge.time <= parcours.time * 60 and corriged['mistakes'] <= parcours.max_mistakes
challenge.validated = validated
if challenger.validated == False and validated:
challenger.validated = True
elif challenger.validated == True and not validated:
challenger.validated = checkValidated(challenger, db, challenge)
challenger.avg = (avg + corriged['mistakes']) / challengesCount
challenge.data = corriged['data']
challenge.mistakes = corriged['mistakes']
challenge.isCorriged = corriged['isCorriged']
# challenge.validated = corriged['mistakes'] <= parcours.max_mistakes
db.add(challenge)
db.add(challenger)
db.commit()
db.refresh(challenge)
db.refresh(challenger)
return challenge, challenger
# Dependencies
def check_room(room_id: str, db: Session = Depends(get_session)):
room = db.exec(select(Room).where(Room.id_code == room_id)).first()
return room
def get_room(room_id, db: Session = Depends(get_session)):
room = db.exec(select(Room).where(Room.id_code == room_id)).first()
if room is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Salle introuvable")
return room
def get_member_dep(room: Room = Depends(get_room), user: User = Depends(get_current_user_optional),
clientId: str | None = Query(default=None), db: Session = Depends(get_session)):
if user is None and clientId is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
if user is not None:
member = get_member_from_user(user.id, room.id, db)
elif clientId is not None:
member = get_member_from_clientId(clientId, room.id, db)
if member is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Vous n'êtes pas dans cette salle")
return member
def check_admin(member: Member = Depends(get_member_dep)):
if member.is_admin is False:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Vous devez être administrateur pour faire cela")
return member
def get_parcours(parcours_id: str, room: Room = Depends(get_room), db: Session = Depends(get_session)):
room = db.exec(select(Parcours).where(Parcours.id_code ==
parcours_id, Parcours.room_id == room.id)).first()
if room is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Parcours introuvable")
return room
def get_exercices(parcours: Parcours = Depends(get_parcours), db: Session = Depends(get_session)):
exercices = db.exec(select(Exercice).where(col(Exercice.id_code).in_(
[e['exercice_id'] for e in parcours.exercices]))).all()
return [{"exercice": e, "quantity": [q for q in parcours.exercices if q['exercice_id'] == e.id_code][0]['quantity']}
for e in exercices]
def get_correction(correction_id: str, parcours_id: str, member: Member = Depends(get_member_dep),
db: Session = Depends(get_session)):
tmpCorr = db.exec(select(TmpCorrection).where(
TmpCorrection.id_code == correction_id, TmpCorrection.parcours_id == parcours_id)).first()
if tmpCorr is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Correction introuvable")
if member != tmpCorr.member:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="It's not your challenge")
return tmpCorr
def get_challenge(challenge_id: str, db: Session = Depends(get_session)):
challenge = db.exec(select(Challenge).where(
Challenge.id_code == challenge_id)).first()
if challenge is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Challenge introuvable")
if challenge.data == []:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Impossible de corriger ce challenge")
return challenge