generateur_v3/backend/api/database/room/crud.py

875 lines
31 KiB
Python
Raw Normal View History

2022-10-10 01:34:38 +02:00
import uuid
2023-02-22 12:43:39 +01:00
from copy import deepcopy
from typing import List
from fastapi import Depends, HTTPException, status, Query
2022-10-10 01:34:38 +02:00
from pydantic import BaseModel
2023-02-22 12:43:39 +01:00
from sqlalchemy import func
from sqlmodel import Session, delete, select, col
2022-09-25 22:32:19 +02:00
from database.auth.crud import get_user_from_token
2023-02-22 12:43:39 +01:00
from database.auth.models import User
from database.db import get_session
2022-10-10 01:34:38 +02:00
from database.exercices.models import Exercice
2023-02-22 12:43:39 +01:00
from database.room.models import Anonymous, Challenge, Challenges, CorrigedGeneratorOut, Exercices, ExercicesCreate, \
Member, Note, Parcours, ParcoursCreate, ParcoursReadShort, ParsedGeneratorOut, Room, RoomCreate, RoomInfo, \
TmpCorrection, Waiter, MemberRead, CorrigedData, CorrectionData, Challenger
from services.auth import get_current_user_optional
from services.database import generate_unique_code
2022-09-25 22:32:19 +02:00
2022-09-16 21:50:55 +02:00
2022-09-26 10:04:02 +02:00
def create_room_db(*, room: RoomCreate, user: User | None = None, username: str | None = None, db: Session):
id_code = generate_unique_code(Room, s=db)
member_id = generate_unique_code(Member, s=db)
room_obj = Room(**room.dict(exclude_unset=True), id_code=id_code)
if user is not None:
member = Member(user_id=user.id, room=room_obj,
is_admin=True, id_code=member_id)
db.add(member)
db.commit()
db.refresh(member)
if username is not None:
reconnect_code = generate_unique_code(
Anonymous, s=db, field_name='reconnect_code')
anonymous = Anonymous(username=username, reconnect_code=reconnect_code)
member = Member(anonymous=anonymous, room=room_obj,
is_admin=True, id_code=member_id)
db.add(member)
db.commit()
db.refresh(member)
if username is None and user is None:
raise ValueError('Username or user required')
return {"room": room_obj, "member": member}
2022-09-18 22:43:04 +02:00
2023-02-22 12:43:39 +01:00
2022-10-10 01:34:38 +02:00
def change_room_name(room: Room, name: str, db: Session):
room.name = name
db.add(room)
db.commit()
db.refresh(room)
return room
2023-02-22 12:43:39 +01:00
2022-10-10 01:34:38 +02:00
def change_room_status(room: Room, public: bool, db: Session):
room.public = public
db.add(room)
db.commit()
db.refresh(room)
return room
2022-09-18 22:43:04 +02:00
2023-02-22 12:43:39 +01:00
2022-09-25 22:32:19 +02:00
def get_member_from_user(user_id: int, room_id: int, db: Session):
member = db.exec(select(Member).where(Member.room_id ==
room_id, Member.user_id == user_id)).first()
return member
def get_member_from_token(token: str, room_id: int, db: Session):
user = get_user_from_token(token, db)
if user is False:
return False
if user is None:
return None
member = get_member_from_user(user.id, room_id, db)
return member
def get_member_from_anonymous(anonymous_id: int, room_id: int, db: Session):
member = db.exec(select(Member).where(Member.room_id ==
room_id, Member.anonymous_id == anonymous_id)).first()
return member
def get_member_from_reconnect_code(reconnect_code: str, room_id: int, db: Session):
anonymous = get_anonymous_from_code(reconnect_code, db)
if anonymous is None:
return None
member = get_member_from_anonymous(anonymous.id, room_id, db)
return member
def get_anonymous_from_code(reconnect_code: str, db: Session):
anonymous = db.exec(select(Anonymous).where(
Anonymous.reconnect_code == reconnect_code)).first()
return anonymous
2023-02-22 12:43:39 +01:00
2022-10-10 01:34:38 +02:00
def get_anonymous_from_clientId(clientId: str, db: Session):
anonymous = db.exec(select(Anonymous).where(
Anonymous.clientId == clientId)).first()
return anonymous
def get_member_from_clientId(clientId: str, room_id: int, db: Session):
anonymous = get_anonymous_from_clientId(clientId, db)
if anonymous is None:
return None
member = get_member_from_anonymous(anonymous.id, room_id, db)
return member
2022-09-25 22:32:19 +02:00
2022-09-26 10:04:02 +02:00
def create_member(*, room: Room, user: User | None = None, anonymous: Anonymous | None = None, waiting: bool = False, db: Session):
member_id = generate_unique_code(Member, s=db)
2023-02-22 12:43:39 +01:00
member = Member(room=room, user=user, anonymous=anonymous, waiting=waiting,
2022-09-26 10:04:02 +02:00
id_code=member_id)
2023-02-22 12:43:39 +01:00
member.online = True
2022-09-26 10:04:02 +02:00
db.add(member)
db.commit()
db.refresh(member)
return member
2023-02-22 12:43:39 +01:00
def get_or_create_member(*, room: Room, user: User | None = None, anonymous: Anonymous | None = None,
waiting: bool = False, db: Session):
member = user is not None and get_member_from_user(user.id, room.id, db)
2022-09-26 10:04:02 +02:00
if member is not None and member is not False:
return member
2023-02-22 12:43:39 +01:00
member = create_member(room=room, user=user,
anonymous=anonymous, waiting=waiting, db=db)
2022-09-26 10:04:02 +02:00
2022-09-25 22:32:19 +02:00
def connect_member(member: Member, db: Session):
member.online = True
db.add(member)
db.commit()
db.refresh(member)
return member
def disconnect_member(member: Member, db: Session):
if member.waiting == False:
member.online = False
2023-02-22 12:43:39 +01:00
2022-10-10 01:34:38 +02:00
if member.anonymous is not None:
2023-02-22 12:43:39 +01:00
change_anonymous_clientId(member.anonymous, db)
2022-09-25 22:32:19 +02:00
db.add(member)
db.commit()
db.refresh(member)
return member
else:
db.delete(member)
db.commit()
return member
def validate_username(username: str, room: Room, db: Session = Depends(get_session)):
if len(username) > 20:
return None
members = select(Member.anonymous_id).where(
Member.room_id == room.id, Member.anonymous_id != None)
anonymous = select(Anonymous).where(
col(Anonymous.id).in_(members), Anonymous.username == username)
username_anonymous = db.exec(anonymous).first()
return None if username_anonymous is not None else username
def create_anonymous_member(username: str, room: Room, db: Session):
username = validate_username(username, room, db)
if username is None:
return None
reconnect_code = generate_unique_code(
Anonymous, s=db, field_name="reconnect_code")
anonymous = Anonymous(username=username, reconnect_code=reconnect_code)
member_id = generate_unique_code(Member, s=db)
member = Member(room=room, anonymous=anonymous, id_code=member_id)
db.add(member)
db.commit()
db.refresh(member)
return member
2022-10-10 01:34:38 +02:00
2023-02-22 12:43:39 +01:00
2022-09-26 10:04:02 +02:00
def create_anonymous(username: str, room: Room, db: Session):
username = validate_username(username, room, db)
if username is None:
return None
reconnect_code = generate_unique_code(
Anonymous, s=db, field_name="reconnect_code")
anonymous = Anonymous(username=username, reconnect_code=reconnect_code)
db.add(anonymous)
db.commit()
db.refresh(anonymous)
return anonymous
2022-09-25 22:32:19 +02:00
2023-02-22 12:43:39 +01:00
2022-09-26 10:04:02 +02:00
def check_user_in_room(user_id: int, room_id: int, db: Session):
2023-02-22 12:43:39 +01:00
user = db.exec(select(Member).where(Member.user_id ==
user_id, Member.room_id == room_id)).first()
2022-09-26 10:04:02 +02:00
return user
2022-09-25 22:32:19 +02:00
2023-02-22 12:43:39 +01:00
2022-09-25 22:32:19 +02:00
def create_user_member(user: User, room: Room, db: Session):
member = get_member_from_user(user.id, room.id, db)
if member is not None:
return None
member_id = generate_unique_code(Member, s=db)
member = Member(room=room, user=user, id_code=member_id)
db.add(member)
db.commit()
db.refresh(member)
return member
2023-02-22 12:43:39 +01:00
2022-09-25 22:32:19 +02:00
def create_anonymous_waiter(username: str, room: Room, db: Session):
username = validate_username(username, room, db)
if username is None:
return None
reconnect_code = generate_unique_code(
Anonymous, s=db, field_name="reconnect_code")
anonymous = Anonymous(username=username, reconnect_code=reconnect_code)
member_id = generate_unique_code(Member, s=db)
member = Member(room=room, anonymous=anonymous,
waiting=True, id_code=member_id)
db.add(member)
db.commit()
db.refresh(member)
return member
2023-02-22 12:43:39 +01:00
2022-09-25 22:32:19 +02:00
def create_user_waiter(user: User, room: Room, db: Session):
member = get_member_from_user(user.id, room.id, db)
if member is not None:
return member
2022-09-26 10:04:02 +02:00
member = create_member(room=room, user=user, waiting=True,
db=db)
2022-09-25 22:32:19 +02:00
return member
2023-02-22 12:43:39 +01:00
2022-09-25 22:32:19 +02:00
def get_waiter(waiter_code: str, db: Session):
2022-09-26 10:04:02 +02:00
return db.exec(select(Member).where(Member.id_code == waiter_code, Member.waiting == True)).first()
def get_member(id_code: str, room_id: str, db: Session):
return db.exec(select(Member).where(Member.id_code == id_code, Member.room_id == room_id)).first()
2022-09-25 22:32:19 +02:00
def delete_member(member: Member, db: Session):
db.delete(member)
db.commit()
return None
2022-09-26 10:04:02 +02:00
2022-09-25 22:32:19 +02:00
def accept_waiter(member: Member, db: Session):
member.waiting = False
member.waiter_code = None
2023-02-22 12:43:39 +01:00
member.online = True
2022-09-25 22:32:19 +02:00
db.add(member)
db.commit()
db.refresh(member)
return member
def refuse_waiter(member: Member, db: Session):
db.delete(member)
db.commit()
return None
2022-09-26 10:04:02 +02:00
2022-09-25 22:32:19 +02:00
def leave_room(member: Member, db: Session):
2022-09-26 10:04:02 +02:00
db.delete(member)
db.commit()
return None
2023-02-22 12:43:39 +01:00
def serialize_member(member: Member, private: bool = False, admin: bool = False,
m2: Member | None = None) -> MemberRead | Waiter:
2022-09-26 10:04:02 +02:00
member_obj = member.user or member.anonymous
2023-02-22 12:43:39 +01:00
print("OHLA", member_obj, private, member.user_id == None)
if not member.waiting:
return MemberRead(username=member_obj.username, online=member.online,
clientId=str(member_obj.clientId) if (private == True and member.user_id == None) else "",
reconnect_code=getattr(member_obj, "reconnect_code", "") if (admin or m2 == member) else "",
isUser=member.user_id != None, isAdmin=member.is_admin, id_code=member.id_code).dict()
if member.waiting:
2022-09-26 10:04:02 +02:00
return Waiter(username=member_obj.username, waiter_id=member.id_code).dict()
2022-10-10 01:34:38 +02:00
2023-02-22 12:43:39 +01:00
2022-10-10 01:34:38 +02:00
def serialize_parcours_short(parcours: Parcours, member: Member, db: Session):
2023-02-22 12:43:39 +01:00
challenger = getChallenger(parcours, member, db)
return ParcoursReadShort(name=parcours.name, id_code=parcours.id_code, best_note=challenger.best,
validated=challenger.validated)
2022-10-10 01:34:38 +02:00
def serialize_challenge(challenge: Challenge):
2023-02-22 12:43:39 +01:00
return Challenges(
name=challenge.challenger.user.username if challenge.challenger.user is not None else challenge.challenger.anonymous.username,
value=Note(note=challenge.note, time=challenge.time), isCorriged=challenge.isCorriged,
canCorrige=challenge.data is not None)
2022-10-10 01:34:38 +02:00
def serialize_room(room: Room, member: Member, db: Session):
2023-02-22 12:43:39 +01:00
return RoomInfo(**room.dict(), parcours=[serialize_parcours_short(p, member, db) for p in room.parcours],
members=[serialize_member(m, admin=member.is_admin, m2=member) for m in room.members])
def getUsername(m: Member):
return m.user.username if m.user is not None else m.anonymous.username
def getChallengerInfo(c: Challenge, db: Session):
challenger = db.exec(select(Challenger).where(Challenger.member_id ==
c.challenger_mid, Challenger.parcours_id == c.challenger_pid)).first()
if challenger is not None:
member = challenger.member
return {"name": getUsername(member), "id_code": member.id_code}
def getChallenges(c: Challenger, db: Session):
challenges = db.exec(select(Challenge).where(Challenge.challenger_mid == c.member_id,
Challenge.challenger_pid == c.parcours_id)).all()
return challenges
def getTops(p: Parcours, db: Session):
tops = db.exec(select(Challenge).where(Challenge.parcours_id == p.id_code).order_by(
col(Challenge.mistakes), col(Challenge.time)).limit(3)).all()
tops = [{"challenger": getChallengerInfo(
t, db), "mistakes": t.mistakes, "time": t.time} for t in tops]
return tops
def getAvgTops(p: Parcours, db: Session):
avgTop = db.exec(select(Challenger).where(Challenger.parcours_id ==
p.id).order_by(col(Challenger.avg)).limit(3)).all()
avgTop = [{"id_code": t.member.id_code, "avg": t.avg,
"name": getUsername(t.member)} for t in avgTop]
return avgTop
def getRank(c: Challenger, p: Parcours, db: Session):
noteRank = db.exec(select([func.count(Challenge.id)]).where(Challenge.parcours_id == p.id_code).order_by(
col(Challenge.mistakes), col(Challenge.time)).where(Challenge.mistakes <= c.best,
Challenge.time < c.best_time)).one()
return noteRank + 1
def getAvgRank(c: Challenger, p: Parcours, db: Session):
avgRank = db.exec(select([func.count(Challenger.member_id)]).where(
Challenger.parcours_id == p.id).order_by(col(Challenger.avg)).where(Challenger.avg < c.avg)).one()
return avgRank + 1
def getMemberRank(m: Member, p: Parcours, db: Session):
challenger = db.exec(select(Challenger).where(Challenger.member_id == m.id)).first()
if challenger is None or challenger.best is None:
return None
return getRank(challenger, p, db)
def getMemberAvgRank(m: Member, p: Parcours, db: Session):
challenger = db.exec(select(Challenger).where(Challenger.member_id == m.id)).first()
print('CHALLE', challenger)
if challenger is None or challenger.avg is None:
return None
return getAvgRank(challenger, p, db)
def serialize_parcours(parcours: Parcours, member: Member, db: Session):
tops = getTops(parcours, db)
avgTop = getAvgTops(parcours, db)
challenger = db.exec(select(Challenger).where(
Challenger.member_id == member.id, Challenger.parcours_id == parcours.id)).first()
noteRank = None
avgRank = None
pb = None
if challenger is not None and challenger.avg is not None and challenger.best is not None:
noteRank = getRank(challenger, parcours, db)
avgRank = getAvgRank(challenger, parcours, db)
pb = {"mistakes": challenger.best, "time": challenger.best_time}
statement = select(Challenger).where(Challenger.parcours_id == parcours.id)
if not member.is_admin:
statement = statement.where(Challenger.member_id == member.id)
challengers = db.exec(statement).all()
challs = {c.member.id_code: {
"challenger": {"id_code": c.member.id_code, "name": getUsername(c.member)},
# 'validated': chall.mistakes <= parcours.max_mistakes
"challenges": [Challenges(**{**chall.dict(), "canCorrige": chall.data != []}) for chall in getChallenges(c, db)]
} for c in challengers}
return {**parcours.dict(), "pb": pb, "tops": tops, "challenges": challs, "rank": noteRank, "memberRank": avgRank,
"validated": challenger.validated if challenger != None else False, "ranking": avgTop}
tops = []
challs = {}
challenges = sorted(parcours.challenges, key=lambda x: (
x.note['value'], x.time), reverse=True)
memberRank = None
rank = None
pb = None
validated = False
total = 0
for i, chall in enumerate(challenges):
total += chall.note['value']
id = chall.challenger.id_code
name = chall.challenger.user.username if chall.challenger.user_id != None else chall.challenger.anonymous.username
if i <= 2:
tops.append({"challenger": {"id_code": id, "name": name},
"note": chall.note, "time": chall.time})
if id == member.id_code:
if challs.get(id) is None:
rank = i + 1
memberRank = len(challs) + 1
pb = {"note": chall.note, "time": chall.time}
if validated is False and chall.validated:
validated = True
if member.is_admin or chall.challenger.id_code == member.id_code:
t = challs.get(id, {"total": 0})['total']
challs[id] = {"challenger": {"id_code": id, "name": name
}, "challenges": [*challs.get(id, {'challenges': []})['challenges'],
Challenges(
**{**chall.dict(), "canCorrige": chall.data != []})],
"total": t + chall.note['value']}
topMembers = [{**c['challenger'], "avg": c['total'] /
len(c['challenges'])} for id, c in challs.items()]
topMembers.sort(key=lambda x: x['avg'], reverse=True)
return {**parcours.dict(), "tops": tops, "challenges": challs, "rank": rank, "memberRank": memberRank, "pb": pb,
"validated": validated,
'avg': None if len(parcours.challenges) == 0 else round(total / len(parcours.challenges), 2),
"ranking": topMembers}
2022-10-10 01:34:38 +02:00
def change_anonymous_clientId(anonymous: Anonymous, db: Session):
anonymous.clientId = uuid.uuid4()
db.add(anonymous)
db.commit()
db.refresh(anonymous)
return anonymous
2023-02-22 12:43:39 +01:00
# Parcours
from services.io import add_fast_api_root
from generateur.generateur_main import generate_from_path, parseGeneratorOut
def countInput(ex: Exercice, q: int):
exo = parseGeneratorOut(generate_from_path(add_fast_api_root(
ex.exo_source), 1, "web"))
return len(exo.inputs) * q
class ExoToCount(BaseModel):
ex: Exercice
q: int
def getTotal(exs: list[ExoToCount]):
total = 0
for e in exs:
total += countInput(e.ex, e.q)
return total
def validate_exercices(exos: List[ExercicesCreate], db: Session):
exercices = db.exec(select(Exercice).where(Exercice.web == True).where(
col(Exercice.id_code).in_([e.exercice_id for e in exos]))).all()
2022-10-10 01:34:38 +02:00
exos_id_list = [e.exercice_id for e in exos]
2023-02-22 12:43:39 +01:00
# exoToCountList = [ExoToCount(ex=e, q=q) for e, q in zip(exercices, [c.quantity for c in exos])]
2022-10-10 01:34:38 +02:00
exercices.sort(key=lambda e: exos_id_list.index(e.id_code))
2023-02-22 12:43:39 +01:00
return [Exercices(exercice_id=e.id_code, name=e.name,
quantity=[ex for ex in exos if ex.exercice_id == e.id_code][0].quantity,
examples=e.examples).dict() for e in exercices]
2022-10-10 01:34:38 +02:00
2023-02-22 12:43:39 +01:00
def create_parcours_db(parcours: ParcoursCreate, room_id: int, db: Session):
2022-10-10 01:34:38 +02:00
exercices = validate_exercices(parcours.exercices, db)
if len(exercices) == 0:
return "Veuillez entrer au moins un exercice valide"
id_code = generate_unique_code(Parcours, s=db)
2023-02-22 12:43:39 +01:00
parcours_obj = Parcours(
**{**parcours.dict(), "exercices": exercices}, room_id=room_id, id_code=id_code)
2022-10-10 01:34:38 +02:00
db.add(parcours_obj)
db.commit()
db.refresh(parcours_obj)
return parcours_obj
def deleteParcoursRelated(parcours: Parcours, db: Session):
db.exec(delete(Challenge).where(Challenge.parcours_id == parcours.id_code))
2023-02-22 12:43:39 +01:00
db.exec(delete(TmpCorrection).where(
TmpCorrection.parcours_id == parcours.id_code))
db.exec(delete(Challenger).where(Challenger.parcours_id == parcours.id))
2022-10-10 01:34:38 +02:00
db.commit()
2023-02-22 12:43:39 +01:00
def change_challengers_validation(p: Parcours, validation: int, db: Session):
challengers = db.exec(select(Challenger).where(
Challenger.parcours_id == p.id)).all()
challs = []
for c in challengers:
validated = c.best <= validation
if validated != c.validated:
c.validated = validated
challs.append(c)
db.bulk_save_objects(challs)
db.commit()
def change_challenges_validation(p: Parcours, validation: int, db: Session):
print('cHANGE')
challenges = db.exec(select(Challenge).where(
Challenge.parcours_id == p.id_code)).all()
print('CHALLS', challenges)
challs = []
for c in challenges:
validated = c.mistakes <= validation
print('CHAL', validated, c.validated, c)
if validated != c.validated:
c.validated = validated
challs.append(c)
db.bulk_save_objects(challs)
db.commit()
def changeValidation(p: Parcours, validation: int, db: Session):
change_challengers_validation(p, validation, db)
change_challenges_validation(p, validation, db)
def compareExercices(old: list[Exercices], new: list[ExercicesCreate]):
old = [{"id": o['exercice_id'], "q": o['quantity']} for o in old]
new = [{"id": n.exercice_id, "q": n.quantity} for n in new]
return old == new
2022-10-10 01:34:38 +02:00
def update_parcours_db(parcours: ParcoursCreate, parcours_obj: Parcours, db: Session):
2023-02-22 12:43:39 +01:00
update_challenges = False
if not compareExercices(parcours_obj.exercices, parcours.exercices):
exercices = validate_exercices(parcours.exercices, db)
if len(exercices) == 0:
return "Veuillez entrer au moins un exercice valide"
deleteParcoursRelated(parcours_obj, db)
update_challenges = True
parcours_obj.exercices = exercices
if parcours_obj.max_mistakes != parcours.max_mistakes:
changeValidation(parcours_obj, parcours.max_mistakes, db)
parcours_obj.name = parcours.name
parcours_obj.time = parcours.time
parcours_obj.max_mistakes = parcours.max_mistakes
2022-10-10 01:34:38 +02:00
db.add(parcours_obj)
db.commit()
2023-02-22 12:43:39 +01:00
2022-10-10 01:34:38 +02:00
db.refresh(parcours_obj)
2023-02-22 12:43:39 +01:00
return parcours_obj, update_challenges
2022-10-10 01:34:38 +02:00
def delete_parcours_db(parcours: Parcours, db: Session):
db.delete(parcours)
db.commit()
return True
class CorrigedChallenge(BaseModel):
data: List[List[CorrigedGeneratorOut]]
2023-02-22 12:43:39 +01:00
mistakes: int
2022-10-10 01:34:38 +02:00
isCorriged: bool
2023-02-22 12:43:39 +01:00
def create_tmp_correction(data: List[CorrigedData], parcours_id: str, member: Member, db: Session):
2022-10-10 01:34:38 +02:00
code = generate_unique_code(TmpCorrection, s=db)
tmpCorr = TmpCorrection(data=data, id_code=code,
member=member, parcours_id=parcours_id)
db.add(tmpCorr)
db.commit()
db.refresh(tmpCorr)
2023-02-22 12:43:39 +01:00
return tmpCorr
2022-10-10 01:34:38 +02:00
2023-02-22 12:43:39 +01:00
def validate_challenge_input(obj: List[CorrectionData], corr: TmpCorrection):
2022-10-10 01:34:38 +02:00
data = corr.data
if len(obj) != len(data):
return False
for i in range(len(data)):
exo_corr = data[i]
exo = obj[i]
2023-02-22 12:43:39 +01:00
print('EXO', exo)
print('EXO', exo.data)
if len(exo.data) != len(exo_corr['data']):
2022-10-10 01:34:38 +02:00
return
2023-02-22 12:43:39 +01:00
zipped = zip(exo_corr['data'], exo.data)
same = all([e['calcul'] == f.calcul and len(e['inputs'])
== len(f.inputs) for e, f in zipped])
2022-10-10 01:34:38 +02:00
if not same:
return False
return True
2023-02-22 12:43:39 +01:00
def validate_challenge_correction(obj: List[CorrigedData], chall: Challenge):
2022-10-10 01:34:38 +02:00
data = chall.data
if len(obj) != len(data):
return False
for i in range(len(data)):
exo_corr = data[i]
exo = obj[i]
2023-02-22 12:43:39 +01:00
if len(exo.data) != len(exo_corr['data']):
2022-10-10 01:34:38 +02:00
return
2023-02-22 12:43:39 +01:00
zipped = zip(exo_corr['data'], exo.data)
same = all([e['calcul'] == f.calcul and len(e['inputs'])
== len(f.inputs) for e, f in zipped])
2022-10-10 01:34:38 +02:00
if not same:
return False
return True
def corrige_challenge(obj: List[List[ParsedGeneratorOut]], corr: TmpCorrection) -> CorrigedChallenge:
2023-02-22 12:43:39 +01:00
if validate_challenge_input(obj, corr) is False:
2022-10-10 01:34:38 +02:00
return None
2023-02-22 12:43:39 +01:00
2022-10-10 01:34:38 +02:00
data = corr.data
note = 0
total = 0
isCorriged = True
2023-02-22 12:43:39 +01:00
mistakes = 0
2022-10-10 01:34:38 +02:00
for i in range(len(data)):
2023-02-22 12:43:39 +01:00
exo_corr = data[i]["data"]
exo = obj[i].data
2022-10-10 01:34:38 +02:00
if len(exo) != len(exo_corr):
return
zipped = zip(exo_corr, exo)
for e, f in zipped:
2023-02-22 12:43:39 +01:00
print("HO\n\n")
2022-10-10 01:34:38 +02:00
for k, l in zip(e['inputs'], f.inputs):
k["value"] = str(l.value)
total += 1
if k['correction'] is None:
isCorriged = False
2023-02-22 12:43:39 +01:00
k['valid'] = None
elif str(k["correction"]) == str(l.value):
k['valid'] = True
2022-10-10 01:34:38 +02:00
note += 1
2023-02-22 12:43:39 +01:00
else:
k['valid'] = False
mistakes += 1
return {"data": data, "mistakes": mistakes, "isCorriged": isCorriged}
return {"data": data, "mistakes": mistakes, "note": {"value": note, "total": total}, "isCorriged": isCorriged}
def change_correction(obj: List[CorrigedData], chall: Challenge) -> CorrigedChallenge:
2022-10-10 01:34:38 +02:00
if validate_challenge_correction(obj, chall) is False:
return None
data = deepcopy(chall.data)
note = 0
total = 0
isCorriged = True
2023-02-22 12:43:39 +01:00
mistakes = 0
2022-10-10 01:34:38 +02:00
for i in range(len(data)):
2023-02-22 12:43:39 +01:00
exo_corr = data[i]['data']
exo = obj[i].data
2022-10-10 01:34:38 +02:00
if len(exo) != len(exo_corr):
return
zipped = zip(exo_corr, exo)
for e, f in zipped:
for k, l in zip(e['inputs'], f.inputs):
2023-02-22 12:43:39 +01:00
k["correction"] = l.correction
k["valid"] = l.valid
2022-10-10 01:34:38 +02:00
total += 1
2023-02-22 12:43:39 +01:00
if k['correction'] is None and l.valid is None:
2022-10-10 01:34:38 +02:00
isCorriged = False
2023-02-22 12:43:39 +01:00
if l.valid is True:
2022-10-10 01:34:38 +02:00
note += 1
2023-02-22 12:43:39 +01:00
else:
mistakes += 1
return {"data": data, "mistakes": mistakes, "isCorriged": isCorriged}
2022-10-10 01:34:38 +02:00
return {"data": data, "note": {"value": note, "total": total}, "isCorriged": isCorriged}
2023-02-22 12:43:39 +01:00
def getChallenger(parcours: Parcours, member: Member, db: Session):
challenger = db.exec(select(Challenger).where(
Challenger.member_id == member.id, Challenger.parcours_id == parcours.id)).first()
if challenger is None:
return Challenger(parcours_id=parcours.id, member_id=member.id)
return challenger
def ChallengerFromChallenge(c: Challenge, db: Session):
challenger = db.exec(select(Challenger).where(
Challenger.member_id == c.challenger_mid, Challenger.parcours_id == c.challenger_pid)).first()
return challenger
def checkValidated(challenger: Challenger, db: Session, challenge: Challenge | None = None):
challenges = db.exec(select(Challenge).where(Challenge.challenger_mid == challenger.member_id,
Challenge.challenger_pid == challenger.parcours_id,
Challenge.validated == True)).all()
if challenge is not None:
challenges = [c for c in challenges if c.id != challenge.id]
return len(challenges) != 0
2022-10-10 01:34:38 +02:00
2023-02-22 12:43:39 +01:00
def create_challenge(data: List[CorrigedData], challenger: Member, parcours: Parcours, time: int, mistakes: int,
isCorriged: bool, db: Session):
challenger_obj: Challenger = getChallenger(parcours, challenger, db)
validated = mistakes <= parcours.max_mistakes
challenge = Challenge(data=data, challenger_pid=challenger_obj.parcours_id, challenger_mid=challenger_obj.member_id,
parcours=parcours, time=time, mistakes=mistakes, isCorriged=isCorriged,
id_code=generate_unique_code(Challenge, s=db), validated=validated)
if (challenger_obj.best is not None and challenger_obj.best > mistakes) or challenger_obj.best is None:
challenger_obj.best = mistakes
challenger_obj.best_time = time
challenges = db.exec(select([func.count(Challenge.id)]).where(
Challenge.challenger_mid == challenger_obj.member_id, Challenge.challenger_pid == parcours.id)).one()
if validated and challenger_obj.validated is False:
challenger_obj.validated = True
avg = challenger_obj.avg
if avg is None:
avg = 0
challenger_obj.avg = (avg *
(challenges - 1) + mistakes) / (challenges)
2022-10-10 01:34:38 +02:00
db.add(challenge)
2023-02-22 12:43:39 +01:00
db.add(challenger_obj)
2022-10-10 01:34:38 +02:00
db.commit()
db.refresh(challenge)
2023-02-22 12:43:39 +01:00
db.refresh(challenger_obj)
print('RETURN,', challenge, challenger_obj)
return challenge, challenger_obj
2022-10-10 01:34:38 +02:00
def change_challenge(challenge: Challenge, corriged: CorrigedChallenge, db: Session):
2023-02-22 12:43:39 +01:00
challenger = ChallengerFromChallenge(challenge, db)
challengesCount = len(getChallenges(challenger, db))
avg = challenger.avg * challengesCount - challenge.mistakes
parcours = challenge.parcours
if challenger.best > corriged['mistakes']:
challenger.best = corriged['mistakes']
challenger.best_time = challenge.time
validated = corriged['mistakes'] <= parcours.max_mistakes
challenge.validated = validated
if challenger.validated == False and validated:
challenger.validated = True
elif challenger.validated == True and not validated:
challenger.validated = checkValidated(challenger, db, challenge)
challenger.avg = (avg + corriged['mistakes']) / challengesCount
2022-10-10 01:34:38 +02:00
challenge.data = corriged['data']
2023-02-22 12:43:39 +01:00
challenge.mistakes = corriged['mistakes']
2022-10-10 01:34:38 +02:00
challenge.isCorriged = corriged['isCorriged']
2023-02-22 12:43:39 +01:00
# challenge.validated = corriged['mistakes'] <= parcours.max_mistakes
2022-10-10 01:34:38 +02:00
db.add(challenge)
2023-02-22 12:43:39 +01:00
db.add(challenger)
2022-10-10 01:34:38 +02:00
db.commit()
db.refresh(challenge)
2023-02-22 12:43:39 +01:00
db.refresh(challenger)
2022-10-10 01:34:38 +02:00
2023-02-22 12:43:39 +01:00
return challenge, challenger
2022-10-10 01:34:38 +02:00
# Dependencies
def check_room(room_id: str, db: Session = Depends(get_session)):
room = db.exec(select(Room).where(Room.id_code == room_id)).first()
return room
def get_room(room_id, db: Session = Depends(get_session)):
room = db.exec(select(Room).where(Room.id_code == room_id)).first()
if room is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Salle introuvable")
return room
2023-02-22 12:43:39 +01:00
def get_member_dep(room: Room = Depends(get_room), user: User = Depends(get_current_user_optional),
clientId: str | None = Query(default=None), db: Session = Depends(get_session)):
2022-10-10 01:34:38 +02:00
if user is None and clientId is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
if user is not None:
member = get_member_from_user(user.id, room.id, db)
2023-02-22 12:43:39 +01:00
elif clientId is not None:
2022-10-10 01:34:38 +02:00
member = get_member_from_clientId(clientId, room.id, db)
if member is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Vous n'êtes pas dans cette salle")
return member
def check_admin(member: Member = Depends(get_member_dep)):
if member.is_admin is False:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Vous devez être administrateur pour faire cela")
return member
def get_parcours(parcours_id: str, room: Room = Depends(get_room), db: Session = Depends(get_session)):
room = db.exec(select(Parcours).where(Parcours.id_code ==
parcours_id, Parcours.room_id == room.id)).first()
if room is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Parcours introuvable")
return room
def get_exercices(parcours: Parcours = Depends(get_parcours), db: Session = Depends(get_session)):
exercices = db.exec(select(Exercice).where(col(Exercice.id_code).in_(
[e['exercice_id'] for e in parcours.exercices]))).all()
return [{"exercice": e, "quantity": [q for q in parcours.exercices if q['exercice_id'] == e.id_code][0]['quantity']} for e in exercices]
def get_correction(correction_id: str, parcours_id: str, member: Member = Depends(get_member_dep), db: Session = Depends(get_session)):
tmpCorr = db.exec(select(TmpCorrection).where(
TmpCorrection.id_code == correction_id, TmpCorrection.parcours_id == parcours_id)).first()
if tmpCorr is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Correction introuvable")
if member != tmpCorr.member:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="It's not your challenge")
return tmpCorr
2023-02-22 12:43:39 +01:00
def get_challenge(challenge_id: str, db: Session = Depends(get_session)):
2022-10-10 01:34:38 +02:00
challenge = db.exec(select(Challenge).where(
2023-02-22 12:43:39 +01:00
Challenge.id_code == challenge_id)).first()
2022-10-10 01:34:38 +02:00
if challenge is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Challenge introuvable")
if challenge.data == []:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Impossible de corriger ce challenge")
return challenge