861 lines
30 KiB
Python
861 lines
30 KiB
Python
import uuid
|
|
from copy import deepcopy
|
|
from typing import List
|
|
from uuid import uuid4
|
|
|
|
from fastapi import Depends, HTTPException, status, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import func
|
|
from sqlmodel import Session, delete, select, col, update
|
|
|
|
from database.auth.crud import get_user_from_token
|
|
from database.auth.models import User
|
|
from database.db import get_session
|
|
from database.exercices.models import Exercice
|
|
from database.room.models import Anonymous, Challenge, Challenges, CorrigedGeneratorOut, Exercices, ExercicesCreate, \
|
|
Member, Note, Parcours, ParcoursCreate, ParcoursReadShort, ParsedGeneratorOut, Room, RoomCreate, RoomInfo, \
|
|
TmpCorrection, Waiter, MemberRead, CorrigedData, CorrectionData, Challenger
|
|
from services.auth import get_current_user_optional
|
|
from services.database import generate_unique_code
|
|
|
|
|
|
def create_room_db(*, room: RoomCreate, user: User | None = None, username: str | None = None, db: Session):
|
|
id_code = generate_unique_code(Room, s=db)
|
|
member_id = generate_unique_code(Member, s=db)
|
|
room_obj = Room(**room.dict(exclude_unset=True), id_code=id_code)
|
|
if user is not None:
|
|
member = Member(user_id=user.id, room=room_obj,
|
|
is_admin=True, id_code=member_id)
|
|
db.add(member)
|
|
db.commit()
|
|
db.refresh(member)
|
|
|
|
if username is not None:
|
|
# reconnect_code = generate_unique_code(
|
|
# Anonymous, s=db, field_name='reconnect_code')
|
|
# anonymous = Anonymous(username=username, reconnect_code=reconnect_code)
|
|
# member = Member(anonymous=anonymous, room=room_obj,
|
|
# is_admin=True, id_code=member_id)
|
|
member = create_anonymous_member(username, room_obj, db)
|
|
if member is None:
|
|
raise ValueError('Nom d\'utilisateur invalide (4 - 15 caractères)')
|
|
member.is_admin = True
|
|
|
|
db.add(member)
|
|
db.commit()
|
|
db.refresh(member)
|
|
if username is None and user is None:
|
|
raise ValueError('Username or user required')
|
|
|
|
return {"room": room_obj, "member": member}
|
|
|
|
|
|
def change_room_name(room: Room, name: str, db: Session):
|
|
room.name = name
|
|
db.add(room)
|
|
db.commit()
|
|
db.refresh(room)
|
|
return room
|
|
|
|
|
|
def change_room_status(room: Room, public: bool, db: Session):
|
|
room.public = public
|
|
db.add(room)
|
|
db.commit()
|
|
db.refresh(room)
|
|
return room
|
|
|
|
|
|
def delete_room_db(room: Room, db: Session):
|
|
db.delete(room)
|
|
db.commit()
|
|
return True
|
|
|
|
|
|
def get_member_from_user(user_id: int, room_id: int, db: Session):
|
|
member = db.exec(select(Member).where(Member.room_id ==
|
|
room_id, Member.user_id == user_id)).first()
|
|
return member
|
|
|
|
|
|
def get_member_from_token(token: str, room_id: int, db: Session):
|
|
user = get_user_from_token(token, db)
|
|
if user is False:
|
|
return False
|
|
if user is None:
|
|
return None
|
|
member = get_member_from_user(user.id, room_id, db)
|
|
return member
|
|
|
|
|
|
def get_member_from_anonymous(anonymous_id: int, room_id: int, db: Session):
|
|
member = db.exec(select(Member).where(Member.room_id ==
|
|
room_id, Member.anonymous_id == anonymous_id)).first()
|
|
return member
|
|
|
|
|
|
def get_member_from_reconnect_code(reconnect_code: str, room_id: int, db: Session):
|
|
anonymous = get_anonymous_from_code(reconnect_code, db)
|
|
if anonymous is None:
|
|
return None
|
|
member = get_member_from_anonymous(anonymous.id, room_id, db)
|
|
return member
|
|
|
|
|
|
def get_anonymous_from_code(reconnect_code: str, db: Session):
|
|
anonymous = db.exec(select(Anonymous).where(
|
|
Anonymous.reconnect_code == reconnect_code)).first()
|
|
return anonymous
|
|
|
|
|
|
def get_anonymous_from_clientId(clientId: str, db: Session):
|
|
anonymous = db.exec(select(Anonymous).where(
|
|
Anonymous.clientId == clientId)).first()
|
|
return anonymous
|
|
|
|
|
|
def get_member_from_clientId(clientId: str, room_id: int, db: Session):
|
|
anonymous = get_anonymous_from_clientId(clientId, db)
|
|
if anonymous is None:
|
|
return None
|
|
member = get_member_from_anonymous(anonymous.id, room_id, db)
|
|
return member
|
|
|
|
|
|
def create_member(*, room: Room, user: User | None = None, anonymous: Anonymous | None = None, waiting: bool = False,
|
|
db: Session):
|
|
member_id = generate_unique_code(Member, s=db)
|
|
member = Member(room=room, user=user, anonymous=anonymous, waiting=waiting,
|
|
id_code=member_id)
|
|
member.online = True
|
|
db.add(member)
|
|
db.commit()
|
|
db.refresh(member)
|
|
return member
|
|
|
|
|
|
def get_or_create_member(*, room: Room, user: User | None = None, anonymous: Anonymous | None = None,
|
|
waiting: bool = False, db: Session):
|
|
member = user is not None and get_member_from_user(user.id, room.id, db)
|
|
if member is not None and member is not False:
|
|
return member
|
|
member = create_member(room=room, user=user,
|
|
anonymous=anonymous, waiting=waiting, db=db)
|
|
|
|
|
|
def connect_member(member: Member, db: Session):
|
|
member.online = True
|
|
db.add(member)
|
|
db.commit()
|
|
db.refresh(member)
|
|
return member
|
|
|
|
|
|
def disconnect_member(member: Member, db: Session):
|
|
if member.waiting == False:
|
|
member.online = False
|
|
|
|
if member.anonymous is not None:
|
|
change_anonymous_clientId(member.anonymous, db)
|
|
|
|
db.add(member)
|
|
db.commit()
|
|
db.refresh(member)
|
|
return member
|
|
else:
|
|
db.delete(member)
|
|
db.commit()
|
|
return member
|
|
|
|
|
|
def validate_username(username: str, room: Room, db: Session = Depends(get_session)):
|
|
username = username.strip()
|
|
if not (4 <= len(username) <= 15):
|
|
return None
|
|
members = select(Member.anonymous_id).where(
|
|
Member.room_id == room.id, Member.anonymous_id != None)
|
|
anonymous = select(Anonymous).where(
|
|
col(Anonymous.id).in_(members), Anonymous.username == username)
|
|
username_anonymous = db.exec(anonymous).first()
|
|
return None if username_anonymous is not None else username
|
|
|
|
def create_anonymous(username: str, room: Room, db: Session):
|
|
username = validate_username(username, room, db)
|
|
if username is None:
|
|
return None
|
|
reconnect_code = generate_unique_code(
|
|
Anonymous, s=db, field_name="reconnect_code")
|
|
anonymous = Anonymous(username=username, reconnect_code=reconnect_code, clientId=uuid4())
|
|
db.add(anonymous)
|
|
db.commit()
|
|
db.refresh(anonymous)
|
|
return anonymous
|
|
|
|
|
|
def create_anonymous_member(username: str, room: Room, db: Session):
|
|
anonymous = create_anonymous(username, room, db)
|
|
if anonymous is None:
|
|
return None
|
|
member_id = generate_unique_code(Member, s=db)
|
|
member = Member(room=room, anonymous=anonymous, id_code=member_id)
|
|
db.add(member)
|
|
db.commit()
|
|
db.refresh(member)
|
|
return member
|
|
|
|
|
|
|
|
def check_user_in_room(user_id: int, room_id: int, db: Session):
|
|
user = db.exec(select(Member).where(Member.user_id ==
|
|
user_id, Member.room_id == room_id)).first()
|
|
return user
|
|
|
|
|
|
def create_user_member(user: User, room: Room, db: Session):
|
|
member = get_member_from_user(user.id, room.id, db)
|
|
if member is not None:
|
|
return None
|
|
member_id = generate_unique_code(Member, s=db)
|
|
member = Member(room=room, user=user, id_code=member_id)
|
|
db.add(member)
|
|
db.commit()
|
|
db.refresh(member)
|
|
return member
|
|
|
|
|
|
def create_anonymous_waiter(username: str, room: Room, db: Session):
|
|
username = validate_username(username, room, db)
|
|
if username is None:
|
|
return None
|
|
reconnect_code = generate_unique_code(
|
|
Anonymous, s=db, field_name="reconnect_code")
|
|
anonymous = Anonymous(username=username, reconnect_code=reconnect_code)
|
|
|
|
member_id = generate_unique_code(Member, s=db)
|
|
member = Member(room=room, anonymous=anonymous,
|
|
waiting=True, id_code=member_id)
|
|
db.add(member)
|
|
db.commit()
|
|
db.refresh(member)
|
|
return member
|
|
|
|
|
|
def create_user_waiter(user: User, room: Room, db: Session):
|
|
member = get_member_from_user(user.id, room.id, db)
|
|
if member is not None:
|
|
return member
|
|
|
|
member = create_member(room=room, user=user, waiting=True,
|
|
db=db)
|
|
return member
|
|
|
|
|
|
def get_waiter(waiter_code: str, db: Session):
|
|
return db.exec(select(Member).where(Member.id_code == waiter_code, Member.waiting == True)).first()
|
|
|
|
|
|
def get_member(id_code: str, room_id: str, db: Session):
|
|
return db.exec(select(Member).where(Member.id_code == id_code, Member.room_id == room_id)).first()
|
|
|
|
|
|
def delete_member(member: Member, db: Session):
|
|
db.delete(member)
|
|
db.commit()
|
|
return None
|
|
|
|
|
|
def accept_waiter(member: Member, db: Session):
|
|
member.waiting = False
|
|
member.waiter_code = None
|
|
member.online = True
|
|
db.add(member)
|
|
db.commit()
|
|
db.refresh(member)
|
|
return member
|
|
|
|
|
|
def refuse_waiter(member: Member, db: Session):
|
|
db.delete(member)
|
|
db.commit()
|
|
return None
|
|
|
|
|
|
def leave_room(member: Member, db: Session):
|
|
# db.execute(delete(Challenger).where(col(Challenger.member_id) == member.id))
|
|
# db.execute(delete(TmpCorrection).where(col(TmpCorrection.member_id) == member.id))
|
|
db.delete(member)
|
|
db.commit()
|
|
return None
|
|
|
|
|
|
def serialize_member(member: Member, private: bool = False, admin: bool = False,
|
|
m2: Member | None = None) -> MemberRead | Waiter:
|
|
member_obj = member.user or member.anonymous
|
|
|
|
if not member.waiting:
|
|
return MemberRead(username=member_obj.username, online=member.online,
|
|
clientId=str(member_obj.clientId) if (private == True and member.user_id == None) else "",
|
|
reconnect_code=getattr(member_obj, "reconnect_code", "") if (admin or m2 == member) else "",
|
|
isUser=member.user_id != None, isAdmin=member.is_admin, id_code=member.id_code).dict()
|
|
if member.waiting:
|
|
return Waiter(username=member_obj.username, waiter_id=member.id_code).dict()
|
|
|
|
|
|
def serialize_parcours_short(parcours: Parcours, member: Member, db: Session):
|
|
challenger = getChallenger(parcours, member, db)
|
|
|
|
return ParcoursReadShort(name=parcours.name, id_code=parcours.id_code, best_note=challenger.best,
|
|
validated=challenger.validated, avg=challenger.avg)
|
|
|
|
|
|
def serialize_challenge(challenge: Challenge):
|
|
return Challenges(
|
|
name=challenge.challenger.user.username if challenge.challenger.user is not None else challenge.challenger.anonymous.username,
|
|
value=Note(note=challenge.note, time=challenge.time), isCorriged=challenge.isCorriged,
|
|
canCorrige=challenge.data is not None)
|
|
|
|
|
|
def serialize_room(room: Room, member: Member, db: Session):
|
|
return RoomInfo(**room.dict(), parcours=[serialize_parcours_short(p, member, db) for p in room.parcours],
|
|
members=[serialize_member(m, admin=member.is_admin, m2=member) for m in room.members])
|
|
|
|
|
|
def getUsername(m: Member):
|
|
return m.user.username if m.user is not None else m.anonymous.username
|
|
|
|
|
|
def getChallengerInfo(c: Challenge, db: Session):
|
|
challenger = db.exec(select(Challenger).where(Challenger.member_id ==
|
|
c.challenger_mid, Challenger.parcours_id == c.challenger_pid)).first()
|
|
if challenger is not None:
|
|
member = challenger.member
|
|
return {"name": getUsername(member), "id_code": member.id_code}
|
|
|
|
|
|
def getChallenges(c: Challenger, db: Session):
|
|
challenges = db.exec(select(Challenge).where(Challenge.challenger_mid == c.member_id,
|
|
Challenge.challenger_pid == c.parcours_id)).all()
|
|
return challenges
|
|
|
|
|
|
def getMemberChallenges(m: Member, p: Parcours, db: Session):
|
|
challenges = db.exec(select(Challenge).where(Challenge.challenger_mid == m.id,
|
|
Challenge.challenger_pid == p.id)).all()
|
|
return challenges
|
|
|
|
|
|
def getTops(p: Parcours, db: Session):
|
|
tops = db.exec(select(Challenge).where(Challenge.parcours_id == p.id_code).order_by(
|
|
col(Challenge.mistakes), col(Challenge.time)).limit(3)).all()
|
|
|
|
tops = [{"challenger": getChallengerInfo(
|
|
t, db), "mistakes": t.mistakes, "time": t.time} for t in tops]
|
|
return tops
|
|
|
|
|
|
def getAvgTops(p: Parcours, db: Session):
|
|
avgTop = db.exec(select(Challenger).where(Challenger.parcours_id ==
|
|
p.id).order_by(col(Challenger.avg)).limit(3)).all()
|
|
|
|
avgTop = [{"id_code": t.member.id_code, "avg": t.avg,
|
|
"name": getUsername(t.member)} for t in avgTop]
|
|
return avgTop
|
|
|
|
|
|
def getRank(c: Challenger, p: Parcours, db: Session):
|
|
noteRank = db.exec(select([func.count(Challenge.id)]).where(Challenge.parcours_id == p.id_code).order_by(
|
|
col(Challenge.mistakes), col(Challenge.time)).where(Challenge.mistakes < c.best,
|
|
Challenge.time < c.best_time)).one()
|
|
return noteRank + 1
|
|
|
|
|
|
def getAvgRank(c: Challenger, p: Parcours, db: Session):
|
|
avgRank = db.exec(select([func.count(Challenger.member_id)]).where(
|
|
Challenger.parcours_id == p.id).order_by(col(Challenger.avg)).where(Challenger.avg < c.avg)).one()
|
|
return avgRank + 1
|
|
|
|
|
|
def getMemberRank(m: Member, p: Parcours, db: Session):
|
|
challenger = db.exec(select(Challenger).where(Challenger.member_id == m.id)).first()
|
|
if challenger is None or challenger.best is None:
|
|
return None
|
|
return getRank(challenger, p, db)
|
|
|
|
|
|
def getMemberAvgRank(m: Member, p: Parcours, db: Session):
|
|
challenger = db.exec(select(Challenger).where(Challenger.member_id == m.id)).first()
|
|
|
|
if challenger is None or challenger.avg is None:
|
|
return None
|
|
return getAvgRank(challenger, p, db)
|
|
|
|
|
|
def getMemberValidated(m: Member, p: Parcours, db: Session):
|
|
challenger = db.exec(select(Challenger).where(Challenger.member_id == m.id)).first()
|
|
if challenger is None or challenger.validated is None:
|
|
return None
|
|
return challenger.validated
|
|
|
|
|
|
def serialize_parcours(parcours: Parcours, member: Member, db: Session):
|
|
tops = getTops(parcours, db)
|
|
avgTop = getAvgTops(parcours, db)
|
|
|
|
challenger = db.exec(select(Challenger).where(
|
|
Challenger.member_id == member.id, Challenger.parcours_id == parcours.id)).first()
|
|
|
|
noteRank = None
|
|
avgRank = None
|
|
pb = None
|
|
if challenger is not None and challenger.avg is not None and challenger.best is not None:
|
|
noteRank = getRank(challenger, parcours, db)
|
|
avgRank = getAvgRank(challenger, parcours, db)
|
|
pb = {"mistakes": challenger.best, "time": challenger.best_time}
|
|
|
|
statement = select(Challenger).where(Challenger.parcours_id == parcours.id)
|
|
if not member.is_admin:
|
|
statement = statement.where(Challenger.member_id == member.id)
|
|
|
|
challengers = db.exec(statement).all()
|
|
|
|
challs = {c.member.id_code: {
|
|
"challenger": {"id_code": c.member.id_code, "name": getUsername(c.member), "validated": c.validated},
|
|
# 'validated': chall.mistakes <= parcours.max_mistakes
|
|
"challenges": [Challenges(**{**chall.dict(), "canCorrige": chall.data != []}) for chall in getChallenges(c, db)]
|
|
} for c in challengers}
|
|
|
|
return {**parcours.dict(), "pb": pb, "tops": tops, "challenges": challs, "rank": noteRank, "memberRank": avgRank,
|
|
"validated": challenger.validated if challenger != None else False, "ranking": avgTop}
|
|
|
|
|
|
def change_anonymous_clientId(anonymous: Anonymous, db: Session):
|
|
anonymous.clientId = uuid.uuid4()
|
|
|
|
db.add(anonymous)
|
|
db.commit()
|
|
db.refresh(anonymous)
|
|
return anonymous
|
|
|
|
|
|
# Parcours
|
|
from services.io import add_fast_api_root
|
|
from generateur.generateur_main import generate_from_path, parseGeneratorOut
|
|
|
|
|
|
def countInput(ex: Exercice, q: int):
|
|
exo = parseGeneratorOut(generate_from_path(add_fast_api_root(
|
|
ex.exo_source), 1, "web"))
|
|
return len(exo.inputs) * q
|
|
|
|
|
|
class ExoToCount(BaseModel):
|
|
ex: Exercice
|
|
q: int
|
|
|
|
|
|
def getTotal(exs: list[ExoToCount]):
|
|
total = 0
|
|
for e in exs:
|
|
total += countInput(e.ex, e.q)
|
|
return total
|
|
|
|
|
|
def validate_exercices(exos: List[ExercicesCreate], db: Session):
|
|
exercices = db.exec(select(Exercice).where(Exercice.web == True).where(
|
|
col(Exercice.id_code).in_([e.exercice_id for e in exos]))).all()
|
|
exos_id_list = [e.exercice_id for e in exos]
|
|
# exoToCountList = [ExoToCount(ex=e, q=q) for e, q in zip(exercices, [c.quantity for c in exos])]
|
|
exercices.sort(key=lambda e: exos_id_list.index(e.id_code))
|
|
return [Exercices(exercice_id=e.id_code, name=e.name,
|
|
quantity=[ex for ex in exos if ex.exercice_id == e.id_code][0].quantity,
|
|
examples=e.examples).dict() for e in exercices]
|
|
|
|
|
|
def create_parcours_db(parcours: ParcoursCreate, room_id: int, db: Session):
|
|
exercices = validate_exercices(parcours.exercices, db)
|
|
if len(exercices) == 0:
|
|
return "Veuillez entrer au moins un exercice valide"
|
|
id_code = generate_unique_code(Parcours, s=db)
|
|
|
|
parcours_obj = Parcours(
|
|
**{**parcours.dict(), "exercices": exercices}, room_id=room_id, id_code=id_code)
|
|
|
|
db.add(parcours_obj)
|
|
db.commit()
|
|
db.refresh(parcours_obj)
|
|
return parcours_obj
|
|
|
|
|
|
def deleteParcoursRelated(parcours: Parcours, db: Session):
|
|
db.exec(delete(Challenge).where(Challenge.parcours_id == parcours.id_code))
|
|
db.exec(delete(TmpCorrection).where(
|
|
TmpCorrection.parcours_id == parcours.id_code))
|
|
db.exec(delete(Challenger).where(Challenger.parcours_id == parcours.id))
|
|
|
|
db.commit()
|
|
|
|
|
|
def change_challengers_validation(p: Parcours, db: Session):
|
|
stmt = update(Challenger).values(
|
|
validated=select(Challenge.id).where(Challenge.challenger_mid == Challenger.member_id,
|
|
Challenge.challenger_pid == Challenger.parcours_id,
|
|
Challenge.validated == 1).exists()).where(
|
|
Challenger.parcours_id == p.id)
|
|
db.execute(stmt)
|
|
db.commit()
|
|
return
|
|
|
|
|
|
def change_challenges_validation(p: Parcours, db: Session):
|
|
challenges = db.exec(select(Challenge).where(
|
|
Challenge.parcours_id == p.id_code)).all()
|
|
challs = []
|
|
for c in challenges:
|
|
validated = c.time <= p.time * 60 and c.mistakes <= p.max_mistakes
|
|
if validated != c.validated:
|
|
c.validated = validated
|
|
challs.append(c)
|
|
|
|
db.bulk_save_objects(challs)
|
|
db.commit()
|
|
|
|
|
|
def changeValidation(p: Parcours, db: Session):
|
|
change_challenges_validation(p, db)
|
|
|
|
change_challengers_validation(p, db)
|
|
|
|
|
|
def compareExercices(old: list[Exercices], new: list[ExercicesCreate]):
|
|
old = [{"id": o['exercice_id'], "q": o['quantity']} for o in old]
|
|
new = [{"id": n.exercice_id, "q": n.quantity} for n in new]
|
|
return old == new
|
|
|
|
|
|
def update_parcours_db(parcours: ParcoursCreate, parcours_obj: Parcours, db: Session):
|
|
update_challenges = False
|
|
|
|
if not compareExercices(parcours_obj.exercices, parcours.exercices):
|
|
exercices = validate_exercices(parcours.exercices, db)
|
|
if len(exercices) == 0:
|
|
return "Veuillez entrer au moins un exercice valide"
|
|
deleteParcoursRelated(parcours_obj, db)
|
|
update_challenges = True
|
|
parcours_obj.exercices = exercices
|
|
|
|
update_validated = parcours_obj.max_mistakes != parcours.max_mistakes or parcours_obj.time != parcours.time
|
|
parcours_obj.name = parcours.name
|
|
parcours_obj.time = parcours.time
|
|
parcours_obj.max_mistakes = parcours.max_mistakes
|
|
|
|
db.add(parcours_obj)
|
|
db.commit()
|
|
|
|
db.refresh(parcours_obj)
|
|
if update_validated:
|
|
changeValidation(parcours_obj, db)
|
|
|
|
return parcours_obj, update_challenges
|
|
|
|
|
|
def delete_parcours_db(parcours: Parcours, db: Session):
|
|
db.delete(parcours)
|
|
db.commit()
|
|
return True
|
|
|
|
|
|
class CorrigedChallenge(BaseModel):
|
|
data: List[List[CorrigedGeneratorOut]]
|
|
mistakes: int
|
|
isCorriged: bool
|
|
|
|
|
|
def create_tmp_correction(data: List[CorrigedData], parcours_id: str, member: Member, db: Session):
|
|
code = generate_unique_code(TmpCorrection, s=db)
|
|
tmpCorr = TmpCorrection(data=data, id_code=code,
|
|
member=member, parcours_id=parcours_id)
|
|
db.add(tmpCorr)
|
|
db.commit()
|
|
db.refresh(tmpCorr)
|
|
|
|
return tmpCorr
|
|
|
|
|
|
def validate_challenge_input(obj: List[CorrectionData], corr: TmpCorrection):
|
|
data = corr.data
|
|
if len(obj) != len(data):
|
|
return False
|
|
for i in range(len(data)):
|
|
exo_corr = data[i]
|
|
exo = obj[i]
|
|
|
|
|
|
if len(exo.data) != len(exo_corr['data']):
|
|
return
|
|
zipped = zip(exo_corr['data'], exo.data)
|
|
same = all([e['calcul'] == f.calcul and len(e['inputs'])
|
|
== len(f.inputs) for e, f in zipped])
|
|
if not same:
|
|
return False
|
|
return True
|
|
|
|
|
|
def validate_challenge_correction(obj: List[CorrigedData], chall: Challenge):
|
|
data = chall.data
|
|
if len(obj) != len(data):
|
|
return False
|
|
for i in range(len(data)):
|
|
exo_corr = data[i]
|
|
exo = obj[i]
|
|
if len(exo.data) != len(exo_corr['data']):
|
|
return
|
|
zipped = zip(exo_corr['data'], exo.data)
|
|
same = all([e['calcul'] == f.calcul and len(e['inputs'])
|
|
== len(f.inputs) for e, f in zipped])
|
|
if not same:
|
|
return False
|
|
return True
|
|
|
|
|
|
def corrige_challenge(obj: List[List[ParsedGeneratorOut]], corr: TmpCorrection) -> CorrigedChallenge:
|
|
if validate_challenge_input(obj, corr) is False:
|
|
return None
|
|
|
|
data = corr.data
|
|
note = 0
|
|
total = 0
|
|
isCorriged = True
|
|
mistakes = 0
|
|
for i in range(len(data)):
|
|
exo_corr = data[i]["data"]
|
|
exo = obj[i].data
|
|
if len(exo) != len(exo_corr):
|
|
return
|
|
zipped = zip(exo_corr, exo)
|
|
for e, f in zipped:
|
|
|
|
for k, l in zip(e['inputs'], f.inputs):
|
|
k["value"] = str(l.value)
|
|
total += 1
|
|
if k['correction'] is None:
|
|
isCorriged = False
|
|
k['valid'] = None
|
|
|
|
elif str(k["correction"]) == str(l.value):
|
|
k['valid'] = True
|
|
note += 1
|
|
else:
|
|
k['valid'] = False
|
|
mistakes += 1
|
|
|
|
return {"data": data, "mistakes": mistakes, "isCorriged": isCorriged}
|
|
return {"data": data, "mistakes": mistakes, "note": {"value": note, "total": total}, "isCorriged": isCorriged}
|
|
|
|
|
|
def change_correction(obj: List[CorrigedData], chall: Challenge) -> CorrigedChallenge:
|
|
if validate_challenge_correction(obj, chall) is False:
|
|
return None
|
|
data = deepcopy(chall.data)
|
|
note = 0
|
|
total = 0
|
|
isCorriged = True
|
|
mistakes = 0
|
|
for i in range(len(data)):
|
|
exo_corr = data[i]['data']
|
|
exo = obj[i].data
|
|
if len(exo) != len(exo_corr):
|
|
return
|
|
zipped = zip(exo_corr, exo)
|
|
for e, f in zipped:
|
|
for k, l in zip(e['inputs'], f.inputs):
|
|
k["correction"] = l.correction
|
|
k["valid"] = l.valid
|
|
total += 1
|
|
if k['correction'] is None and l.valid is None:
|
|
isCorriged = False
|
|
if l.valid is True:
|
|
note += 1
|
|
else:
|
|
mistakes += 1
|
|
|
|
return {"data": data, "mistakes": mistakes, "isCorriged": isCorriged}
|
|
return {"data": data, "note": {"value": note, "total": total}, "isCorriged": isCorriged}
|
|
|
|
|
|
def getChallenger(parcours: Parcours, member: Member, db: Session):
|
|
challenger = db.exec(select(Challenger).where(
|
|
Challenger.member_id == member.id, Challenger.parcours_id == parcours.id)).first()
|
|
if challenger is None:
|
|
return Challenger(parcours_id=parcours.id, member_id=member.id)
|
|
return challenger
|
|
|
|
|
|
def ChallengerFromChallenge(c: Challenge, db: Session):
|
|
challenger = db.exec(select(Challenger).where(
|
|
Challenger.member_id == c.challenger_mid, Challenger.parcours_id == c.challenger_pid)).first()
|
|
return challenger
|
|
|
|
|
|
def checkValidated(challenger: Challenger, db: Session, challenge: Challenge | None = None):
|
|
challenges = db.exec(select(Challenge).where(Challenge.challenger_mid == challenger.member_id,
|
|
Challenge.challenger_pid == challenger.parcours_id,
|
|
Challenge.validated == True)).all()
|
|
if challenge is not None:
|
|
challenges = [c for c in challenges if c.id != challenge.id]
|
|
return len(challenges) != 0
|
|
|
|
|
|
def create_challenge(data: List[CorrigedData], challenger: Member, parcours: Parcours, time: int, mistakes: int,
|
|
isCorriged: bool, db: Session):
|
|
challenger_obj: Challenger = getChallenger(parcours, challenger, db)
|
|
|
|
validated = time <= parcours.time * 60 and mistakes <= parcours.max_mistakes
|
|
|
|
challenge = Challenge(data=data, challenger_pid=challenger_obj.parcours_id, challenger_mid=challenger_obj.member_id,
|
|
parcours=parcours, time=time, mistakes=mistakes, isCorriged=isCorriged,
|
|
id_code=generate_unique_code(Challenge, s=db), validated=validated)
|
|
|
|
if (challenger_obj.best is not None and challenger_obj.best > mistakes) or challenger_obj.best is None:
|
|
challenger_obj.best = mistakes
|
|
challenger_obj.best_time = time
|
|
|
|
challenges = db.exec(select([func.count(Challenge.id)]).where(
|
|
Challenge.challenger_mid == challenger_obj.member_id, Challenge.challenger_pid == parcours.id)).one()
|
|
|
|
if validated and challenger_obj.validated is False:
|
|
challenger_obj.validated = True
|
|
|
|
avg = challenger_obj.avg
|
|
if avg is None:
|
|
avg = 0
|
|
challenger_obj.avg = (avg *
|
|
(challenges - 1) + mistakes) / (challenges)
|
|
db.add(challenge)
|
|
db.add(challenger_obj)
|
|
db.commit()
|
|
db.refresh(challenge)
|
|
db.refresh(challenger_obj)
|
|
|
|
return challenge, challenger_obj
|
|
|
|
|
|
def change_challenge(challenge: Challenge, corriged: CorrigedChallenge, db: Session):
|
|
challenger = ChallengerFromChallenge(challenge, db)
|
|
|
|
challengesCount = len(getChallenges(challenger, db))
|
|
avg = challenger.avg * challengesCount - challenge.mistakes
|
|
parcours = challenge.parcours
|
|
if challenger.best > corriged['mistakes']:
|
|
challenger.best = corriged['mistakes']
|
|
challenger.best_time = challenge.time
|
|
|
|
validated = challenge.time <= parcours.time * 60 and corriged['mistakes'] <= parcours.max_mistakes
|
|
challenge.validated = validated
|
|
|
|
if challenger.validated == False and validated:
|
|
challenger.validated = True
|
|
|
|
elif challenger.validated == True and not validated:
|
|
challenger.validated = checkValidated(challenger, db, challenge)
|
|
|
|
challenger.avg = (avg + corriged['mistakes']) / challengesCount
|
|
|
|
challenge.data = corriged['data']
|
|
challenge.mistakes = corriged['mistakes']
|
|
challenge.isCorriged = corriged['isCorriged']
|
|
# challenge.validated = corriged['mistakes'] <= parcours.max_mistakes
|
|
|
|
db.add(challenge)
|
|
db.add(challenger)
|
|
db.commit()
|
|
db.refresh(challenge)
|
|
db.refresh(challenger)
|
|
|
|
return challenge, challenger
|
|
|
|
|
|
# Dependencies
|
|
|
|
|
|
def check_room(room_id: str, db: Session = Depends(get_session)):
|
|
room = db.exec(select(Room).where(Room.id_code == room_id)).first()
|
|
return room
|
|
|
|
|
|
def get_room(room_id, db: Session = Depends(get_session)):
|
|
room = db.exec(select(Room).where(Room.id_code == room_id)).first()
|
|
if room is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Salle introuvable")
|
|
return room
|
|
|
|
|
|
def get_member_dep(room: Room = Depends(get_room), user: User = Depends(get_current_user_optional),
|
|
clientId: str | None = Query(default=None), db: Session = Depends(get_session)):
|
|
|
|
if user is None and clientId is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
|
|
member = None
|
|
if user is not None:
|
|
member = get_member_from_user(user.id, room.id, db)
|
|
elif clientId is not None:
|
|
member = get_member_from_clientId(clientId, room.id, db)
|
|
|
|
if member is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="Vous n'êtes pas dans cette salle")
|
|
return member
|
|
|
|
|
|
def check_admin(member: Member = Depends(get_member_dep)):
|
|
if member.is_admin is False:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="Vous devez être administrateur pour faire cela")
|
|
return member
|
|
|
|
|
|
def get_parcours(parcours_id: str, room: Room = Depends(get_room), db: Session = Depends(get_session)):
|
|
room = db.exec(select(Parcours).where(Parcours.id_code ==
|
|
parcours_id, Parcours.room_id == room.id)).first()
|
|
if room is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Parcours introuvable")
|
|
return room
|
|
|
|
|
|
def get_exercices(parcours: Parcours = Depends(get_parcours), db: Session = Depends(get_session)):
|
|
exercices = db.exec(select(Exercice).where(col(Exercice.id_code).in_(
|
|
[e['exercice_id'] for e in parcours.exercices]))).all()
|
|
|
|
return [{"exercice": e, "quantity": [q for q in parcours.exercices if q['exercice_id'] == e.id_code][0]['quantity']}
|
|
for e in exercices]
|
|
|
|
|
|
def get_correction(correction_id: str, parcours_id: str, member: Member = Depends(get_member_dep),
|
|
db: Session = Depends(get_session)):
|
|
tmpCorr = db.exec(select(TmpCorrection).where(
|
|
TmpCorrection.id_code == correction_id, TmpCorrection.parcours_id == parcours_id)).first()
|
|
if tmpCorr is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail="Correction introuvable")
|
|
if member != tmpCorr.member:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="It's not your challenge")
|
|
|
|
return tmpCorr
|
|
|
|
|
|
def get_challenge(challenge_id: str, db: Session = Depends(get_session)):
|
|
challenge = db.exec(select(Challenge).where(
|
|
Challenge.id_code == challenge_id)).first()
|
|
if challenge is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail="Challenge introuvable")
|
|
|
|
if challenge.data == []:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail="Impossible de corriger ce challenge")
|
|
|
|
return challenge
|