This commit is contained in:
Kilton937342 2022-10-10 01:34:38 +02:00
parent b639c5a88e
commit 936931f047
31 changed files with 2003 additions and 559 deletions

View File

@ -8,12 +8,21 @@ from database.exercices.models import Exercice
from main import app
from database.db import get_session
import json
import pydantic.json
def _custom_json_serializer(*args, **kwargs) -> str:
"""
Encodes json in the same way that pydantic does.
"""
return json.dumps(*args, default=pydantic.json.pydantic_encoder, **kwargs)
@pytest.fixture(name="session")
def session_fixture():
engine = create_engine(
"sqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool, echo=False
"sqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool, echo=False, json_serializer=_custom_json_serializer
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:

View File

@ -1,10 +1,19 @@
import pydantic.json
import json
from sqlmodel import SQLModel, create_engine, Session, select
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=False, connect_args={"check_same_thread": False})
def _custom_json_serializer(*args, **kwargs) -> str:
"""
Encodes json in the same way that pydantic does.
"""
print('JSON SERIALIZATION')
return json.dumps(*args, default=pydantic.json.pydantic_encoder, **kwargs)
engine = create_engine(sqlite_url, echo=False, connect_args={"check_same_thread": False}, json_serializer=_custom_json_serializer)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)

View File

@ -20,6 +20,7 @@ class FileField(str, metaclass=FileFieldMeta):
@classmethod
def validate(cls, value: str | IO, values):
print('VALID FILE', values)
upload_root = get_or_create_dir(
add_fast_api_root(cls.upload_root))
if not isinstance(value, str):

View File

@ -1,21 +1,26 @@
from fastapi import Query
from fastapi import Query, HTTPException, status
import os
import shutil
from typing import IO, List
from sqlmodel import Session, select, or_
from generateur.generateur_main import generate_from_data, generate_from_path
from database.auth.models import User
from database.db import get_session
from fastapi import Depends
from database.exercices.models import ExerciceCreate, Exercice, ExerciceEdit, ExerciceRead, ExercicesTagLink, Tag, TagCreate
from services.auth import get_current_user
from database.exercices.models import ExampleEnum, ExerciceCreate, Exercice, ExerciceEdit, ExerciceRead, ExercicesTagLink, Tag, TagCreate, Supports
from services.auth import get_current_user, get_current_user_optional
from services.database import generate_unique_code
from services.io import add_fast_api_root, get_ancestor, get_or_create_dir
def create_exo_db(exercice: ExerciceCreate, user: User, exo_source: IO, db: Session):
def create_exo_db(exercice: ExerciceCreate, user: User,supports: Supports, exo_source: IO, db: Session):
example = {
"type": ExampleEnum.csv if supports['csv'] == True else ExampleEnum.web if supports['web'] == True else None,
"data": generate_from_data(exo_source.read(), 3, "csv" if supports['csv'] == True else "web" if supports['web'] == True else None, True) if supports['csv'] == True == True or supports['web'] == True == True else None
}
exo_db = Exercice(**exercice.dict(exclude_unset=True),
author_id=user.id, id_code=generate_unique_code(Exercice, db), exo_source=exo_source)
author_id=user.id, id_code=generate_unique_code(Exercice, db), exo_source=exo_source, **supports, examples=example)
db.add(exo_db)
db.commit()
@ -57,14 +62,23 @@ def clone_exo_db(exercice: Exercice, user: User, db: Session):
db.refresh(new_exo)
return new_exo
def update_exo_db(old_exo: Exercice, new_exo: ExerciceEdit, exo_source: IO | None, db: Session):
def update_exo_db(old_exo: Exercice, new_exo: ExerciceEdit,supports: Supports, exo_source: IO | None, db: Session):
exo_data = new_exo.dict(exclude_unset=True, exclude_none=True)
for key, value in exo_data.items():
setattr(old_exo, key, value)
old_exo.csv = supports['csv']
old_exo.pdf = supports['pdf']
old_exo.web = supports['web']
example = {
"type": ExampleEnum.csv if supports['csv'] == True else ExampleEnum.web if supports['web'] == True else None,
"data": generate_from_data(exo_source.read(), 3, "csv" if supports['csv'] == True else "web" if supports['web'] == True else None, True) if supports['csv'] == True == True or supports['web'] == True == True else None
}
old_exo.examples = example
if exo_source:
os.remove(add_fast_api_root(old_exo.exo_source))
old_exo.exo_source = exo_source
@ -123,12 +137,18 @@ def parse_exo_tags(exo_id: int, user_id: int, db: Session):
# Dependencies
def get_exo_dependency(id_code: str, db: Session = Depends(get_session)):
with db.no_autoflush:
exo = db.exec(select(Exercice).where(
Exercice.id_code == id_code)).first()
return exo
exo = db.exec(select(Exercice).where(
Exercice.id_code == id_code)).first()
return exo
def check_private(exo: Exercice = Depends(get_exo_dependency), user: User = Depends(get_current_user_optional)):
if user is not None and exo.author_id == user.id:
return exo
if exo.private is True:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Cet exercice est privé')
return exo
def check_exercice_author(exo: Exercice | None = Depends(get_exo_dependency), user: User = Depends(get_current_user)):
if not exo:
return None
@ -169,4 +189,5 @@ def serialize_exo(*, exo: Exercice, user_id: User = None, db: Session):
tags = parse_exo_tags(exo_id=exo.id, user_id=user_id,
db=db) if user_id is not None else []
is_author = user_id is not None and check_author(exo=exo, user_id=user_id)
return ExerciceRead(**exo.dict(), author=exo.author, original=exo.original, tags=tags, is_author=is_author)
return ExerciceRead(**exo.dict(), author=exo.author, original=exo.original, tags=tags, is_author=is_author, supports={**exo.dict()})

View File

@ -1,10 +1,10 @@
from sqlmodel import select
from sqlmodel import JSON, Column, select
from sqlalchemy.inspection import inspect
from enum import Enum
import os
from pydantic import BaseModel
from pydantic import validator, root_validator
from generateur.generateur_main import Generateur
from generateur.generateur_main import generate_from_path
from services.exoValidation import get_support_from_path
from services.io import add_fast_api_root, get_filename_from_path
from services.schema import as_form
@ -18,6 +18,23 @@ if TYPE_CHECKING:
from database.auth.models import User
class ExampleEnum(Enum):
csv = 'csv'
pdf = 'pdf'
web = 'web'
undisponible = 'undisponible'
class GeneratorOutput(BaseModel):
calcul: str
correction: str | None = None
class Example(BaseModel):
type: ExampleEnum = ExampleEnum.undisponible
data: List[GeneratorOutput] | None = None
class ExercicesTagLink(SQLModel, table=True):
exercice_id: Optional[int] = Field(
default=None, foreign_key='exercice.id', primary_key=True)
@ -29,9 +46,14 @@ class ExerciceBase(SQLModel):
name: str = Field(max_length=50, index=True)
consigne: Optional[str] = Field(max_length=200, default=None)
private: Optional[bool] = Field(default=False)
class Supports(SQLModel):
csv: bool
pdf: bool
web: bool
class Exercice(ExerciceBase, table=True):
class Exercice(ExerciceBase,Supports, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
id_code: str = Field(unique=True, index=True)
@ -49,8 +71,10 @@ class Exercice(ExerciceBase, table=True):
tags: List['Tag'] = Relationship(
back_populates='exercices', link_model=ExercicesTagLink)
examples: Optional[Example] = Field(
sa_column=Column(JSON), default={})
def dict_relationship(self, *, exclude: List[str], include: List[str]):
relationships = inspect(self.__class__).relationships.items()
relationsData = {rname: getattr(self, rname) for rname, rp in relationships if rname not in exclude and (rname in include if len(include) != 0 else True)}
@ -107,28 +131,8 @@ class ExerciceOrigin(SQLModel):
name: str
class ExoSupport(BaseModel):
pdf: bool
csv: bool
web: bool
class ExampleEnum(Enum):
csv = 'csv'
pdf = 'pdf'
web = 'web'
undisponible = 'undisponible'
class GeneratorOutput(BaseModel):
calcul: str
correction: str | None = None
class Example(BaseModel):
type: ExampleEnum = ExampleEnum.undisponible
examples: List[GeneratorOutput] | None = None
class Author(SQLModel):
username: str
@ -140,6 +144,9 @@ class Author(SQLModel):
def get_source_path_from_name_and_id(name: str, id_code: str):
return f'/uploads/{id_code}/{name}'
class ExerciceRead(ExerciceBase):
id_code: str
author: Author
@ -147,33 +154,13 @@ class ExerciceRead(ExerciceBase):
tags: List[TagRead] = None
exo_source: str
supports: ExoSupport = None
examples: Example = None
is_author: bool = None
supports: Supports
@validator('supports', always=True, pre=True)
def get_supports(cls, value, values):
exo_source = get_source_path_from_name_and_id(values['exo_source'], values['id_code'])
exo_source = add_fast_api_root(exo_source)
if os.path.exists(exo_source):
support_compatibility = get_support_from_path(
exo_source)
return support_compatibility
return None
@validator('examples', always=True)
def get_examples(cls, value, values):
if values.get('supports') == None:
return {}
supports = values.get('supports').dict()
exo_source = get_source_path_from_name_and_id(
values['exo_source'], values['id_code'])
return {
"type": ExampleEnum.csv if supports['csv'] == True else ExampleEnum.web if supports['web'] == True else None,
"data": Generateur(add_fast_api_root(exo_source), 3, "csv" if supports['csv'] == True else "web" if supports['web'] == True else None, True) if supports['csv'] == True == True or supports['web'] == True == True else None
}
@validator('exo_source')
def get_exo_source_name(cls, value, values):

View File

@ -1,15 +1,17 @@
from fastapi import Depends
from sqlmodel import Session, select, col
from services.auth import get_current_user_optional
from services.misc import noteOn20
from copy import deepcopy
from typing import Dict, List
import uuid
from fastapi import Body, Depends, HTTPException, status
from pydantic import BaseModel
from sqlmodel import Session, delete, select, col, table
from database.db import get_session
from database.room.models import Anonymous, Member, Room, RoomCreate, Waiter, MemberRead
from database.room.models import Anonymous, Challenge, Challenges, CorrigedGeneratorOut, Exercices, ExercicesCreate, Member, Note, Parcours, ParcoursCreate, ParcoursReadShort, ParsedGeneratorOut, Room, RoomCreate, RoomInfo, RoomRead, TmpCorrection, Waiter, MemberRead
from database.auth.models import User
from services.database import generate_unique_code
from database.auth.crud import get_user_from_token
def check_room(room_id: str, db: Session = Depends(get_session)):
room = db.exec(select(Room).where(Room.id_code == room_id)).first()
return room
from database.exercices.models import Exercice
def create_room_db(*, room: RoomCreate, user: User | None = None, username: str | None = None, db: Session):
@ -36,6 +38,18 @@ def create_room_db(*, room: RoomCreate, user: User | None = None, username: str
return {"room": room_obj, "member": member}
def change_room_name(room: Room, name: str, db: Session):
room.name = name
db.add(room)
db.commit()
db.refresh(room)
return room
def change_room_status(room: Room, public: bool, db: Session):
room.public = public
db.add(room)
db.commit()
db.refresh(room)
return room
def get_member_from_user(user_id: int, room_id: int, db: Session):
member = db.exec(select(Member).where(Member.room_id ==
@ -72,6 +86,19 @@ def get_anonymous_from_code(reconnect_code: str, db: Session):
Anonymous.reconnect_code == reconnect_code)).first()
return anonymous
def get_anonymous_from_clientId(clientId: str, db: Session):
anonymous = db.exec(select(Anonymous).where(
Anonymous.clientId == clientId)).first()
return anonymous
def get_member_from_clientId(clientId: str, room_id: int, db: Session):
anonymous = get_anonymous_from_clientId(clientId, db)
if anonymous is None:
return None
member = get_member_from_anonymous(anonymous.id, room_id, db)
return member
def create_member(*, room: Room, user: User | None = None, anonymous: Anonymous | None = None, waiting: bool = False, db: Session):
member_id = generate_unique_code(Member, s=db)
@ -89,7 +116,6 @@ def get_or_create_member(*, room: Room, user: User | None = None, anonymous: Ano
return member
member= create_member(room=room, user=user, anonymous=anonymous, waiting=waiting, db=db)
def connect_member(member: Member, db: Session):
member.online = True
@ -102,6 +128,10 @@ def connect_member(member: Member, db: Session):
def disconnect_member(member: Member, db: Session):
if member.waiting == False:
member.online = False
if member.anonymous is not None:
change_anonymous_clientId(member.anonymous,db)
db.add(member)
db.commit()
db.refresh(member)
@ -136,6 +166,7 @@ def create_anonymous_member(username: str, room: Room, db: Session):
db.commit()
db.refresh(member)
return member
def create_anonymous(username: str, room: Room, db: Session):
username = validate_username(username, room, db)
if username is None:
@ -163,7 +194,6 @@ def create_user_member(user: User, room: Room, db: Session):
db.refresh(member)
return member
def create_anonymous_waiter(username: str, room: Room, db: Session):
username = validate_username(username, room, db)
if username is None:
@ -180,9 +210,6 @@ def create_anonymous_waiter(username: str, room: Room, db: Session):
db.refresh(member)
return member
def create_user_waiter(user: User, room: Room, db: Session):
member = get_member_from_user(user.id, room.id, db)
if member is not None:
@ -192,7 +219,6 @@ def create_user_waiter(user: User, room: Room, db: Session):
db=db)
return member
def get_waiter(waiter_code: str, db: Session):
return db.exec(select(Member).where(Member.id_code == waiter_code, Member.waiting == True)).first()
@ -228,9 +254,314 @@ def leave_room(member: Member, db: Session):
return None
def serialize_member(member: Member) -> MemberRead | Waiter:
member_obj = member.user or member.anonymous
if member.waiting == False:
return MemberRead(username=member_obj.username, reconnect_code=getattr(member_obj, "reconnect_code", ""), isUser=member.user_id != None, isAdmin=member.is_admin, id_code=member.id_code).dict()
if member.waiting == True:
return Waiter(username=member_obj.username, waiter_id=member.id_code).dict()
def serialize_parcours_short(parcours: Parcours, member: Member, db: Session):
best_note = db.exec(select(Challenge.note, Challenge.time).where(Challenge.parcours_id == parcours.id, Challenge.challenger_id == member.id).order_by(col(Challenge.note).desc()).limit(1)).first()
note = None
if best_note is not None:
best_note=best_note[0]
note = Note(note=best_note[0], time=best_note[1])
return ParcoursReadShort(**parcours.dict(exclude_unset=True), best_note=note)
def serialize_challenge(challenge: Challenge):
return Challenges(name=challenge.challenger.user.username if challenge.challenger.user is not None else challenge.challenger.anonymous.username, value=Note(note=challenge.note, time=challenge.time), isCorriged=challenge.isCorriged, canCorrige=challenge.data is not None)
def serialize_parcours_short(parcours: Parcours, member: Member, db: Session):
if member.is_member == False:
challenges = db.exec(select(Challenge).where(Challenge.parcours_id == parcours.id, Challenge.challenger_id == member.id)).all()
else:
challenges = db.exec(select(Challenge).where(
Challenge.parcours_id == parcours.id)).all()
challenges = [serialize_challenge(c) for c in challenges]
return Parcours(**parcours.dict(), challenges=challenges)
def serialize_room(room: Room, member: Member, db: Session):
return RoomInfo(**room.dict(), parcours=[serialize_parcours_short(p, member, db) for p in room.parcours], members=[serialize_member(m) for m in room.members])
def change_anonymous_clientId(anonymous: Anonymous, db: Session):
anonymous.clientId = uuid.uuid4()
db.add(anonymous)
db.commit()
db.refresh(anonymous)
return anonymous
#Parcours
def validate_exercices(exos: List[ExercicesCreate], db: Session ):
exercices = db.exec(select(Exercice).where(Exercice.web == True).where(col(Exercice.id_code).in_([e.exercice_id for e in exos]))).all()
exos_id_list = [e.exercice_id for e in exos]
exercices.sort(key=lambda e: exos_id_list.index(e.id_code))
return [Exercices(exercice_id=e.id_code, name=e.name, quantity=[ex for ex in exos if ex.exercice_id == e.id_code][0].quantity).dict() for e in exercices]
def create_parcours_db(parcours: ParcoursCreate,room_id: int, db: Session):
exercices = validate_exercices(parcours.exercices, db)
if len(exercices) == 0:
return "Veuillez entrer au moins un exercice valide"
id_code = generate_unique_code(Parcours, s=db)
parcours_obj = Parcours(**{**parcours.dict(), "exercices": exercices}, room_id=room_id, id_code=id_code)
print(parcours_obj)
db.add(parcours_obj)
db.commit()
db.refresh(parcours_obj)
return parcours_obj
def deleteParcoursRelated(parcours: Parcours, db: Session):
db.exec(delete(Challenge).where(Challenge.parcours_id == parcours.id_code))
db.exec(delete(TmpCorrection).where(TmpCorrection.parcours_id == parcours.id_code))
db.commit()
def update_parcours_db(parcours: ParcoursCreate, parcours_obj: Parcours, db: Session):
exercices = validate_exercices(parcours.exercices, db)
if len(exercices) == 0:
return "Veuillez entrer au moins un exercice valide"
parcours_data = parcours.dict(exclude_unset=True)
for key, value in parcours_data.items():
setattr(parcours_obj, key, value)
parcours_obj.exercices = exercices
db.add(parcours_obj)
db.commit()
deleteParcoursRelated(parcours_obj, db)
db.refresh(parcours_obj)
return parcours_obj
def delete_parcours_db(parcours: Parcours, db: Session):
db.delete(parcours)
db.commit()
return True
class CorrigedChallenge(BaseModel):
data: List[List[CorrigedGeneratorOut]]
note: Note
isCorriged: bool
def create_tmp_correction(data: List[List[CorrigedGeneratorOut]], parcours_id: str, member: Member, db: Session):
code = generate_unique_code(TmpCorrection, s=db)
tmpCorr = TmpCorrection(data=data, id_code=code,
member=member, parcours_id=parcours_id)
db.add(tmpCorr)
db.commit()
db.refresh(tmpCorr)
return tmpCorr
def change_challenge(challenge: Challenge, corriged: CorrigedChallenge, db: Session):
challenge.data = corriged['data']
challenge.note = corriged['note']
challenge.isCorriged = corriged['isCorriged']
challenge.validated = noteOn20(
corriged['note']['value'], corriged['note']['total']) > challenge.parcours.validate_condition
db.add(challenge)
db.commit()
db.refresh(challenge)
return challenge
def validate_challenge_input(obj: List[List[ParsedGeneratorOut]], corr: TmpCorrection):
data = corr.data
if len(obj) != len(data):
return False
for i in range(len(data)):
exo_corr = data[i]
exo = obj[i]
if len(exo) != len(exo_corr):
return
zipped = zip(exo_corr, exo)
same = all([e['calcul'] == f.calcul and len(e['inputs']) == len(f.inputs) for e,f in zipped])
if not same:
return False
return True
def validate_challenge_correction(obj: List[List[CorrigedGeneratorOut]], chall: Challenge):
data = chall.data
if len(obj) != len(data):
return False
for i in range(len(data)):
exo_corr = data[i]
exo = obj[i]
if len(exo) != len(exo_corr):
return
zipped = zip(exo_corr, exo)
same = all([e['calcul'] == f.calcul and len(e['inputs']) == len(f.inputs) for e,f in zipped])
if not same:
return False
return True
def corrige_challenge(obj: List[List[ParsedGeneratorOut]], corr: TmpCorrection) -> CorrigedChallenge:
if validate_challenge_input(obj , corr) is False:
return None
data = corr.data
note = 0
total = 0
isCorriged = True
for i in range(len(data)):
exo_corr = data[i]
exo = obj[i]
if len(exo) != len(exo_corr):
return
zipped = zip(exo_corr, exo)
for e, f in zipped:
for k, l in zip(e['inputs'], f.inputs):
k["value"] = str(l.value)
total += 1
if k['correction'] is None:
isCorriged = False
if str(k["correction"]) == str(l.value):
note += 1
return {"data": data, "note": {"value": 1, "total": 3}, "isCorriged": isCorriged}
def change_correction(obj: List[List[CorrigedGeneratorOut]], chall: Challenge) -> CorrigedChallenge:
if validate_challenge_correction(obj, chall) is False:
return None
data = deepcopy(chall.data)
note = 0
total = 0
isCorriged = True
for i in range(len(data)):
exo_corr = data[i]
exo = obj[i]
if len(exo) != len(exo_corr):
return
zipped = zip(exo_corr, exo)
for e, f in zipped:
for k, l in zip(e['inputs'], f.inputs):
k["correction"] = str(l.correction)
total += 1
if k['correction'] is None:
isCorriged = False
if str(k["correction"]) == str(l.value):
note += 1
return {"data": data, "note": {"value": note, "total": total}, "isCorriged": isCorriged}
def create_challenge(data: List[List[CorrigedGeneratorOut]], challenger: Member, parcours: Parcours, time: int, note: Note, isCorriged: bool, db: Session):
challenge = Challenge(data=data, challenger=challenger, parcours=parcours, time=time, note=note, validated=noteOn20(note["value"], note['total']) >=
parcours.validate_condition, isCorriged=isCorriged, id_code=generate_unique_code(Challenge, s=db))
db.add(challenge)
db.commit()
db.refresh(challenge)
return challenge
def change_challenge(challenge: Challenge, corriged: CorrigedChallenge, db: Session):
challenge.data = corriged['data']
challenge.note = corriged['note']
challenge.isCorriged = corriged['isCorriged']
challenge.validated = noteOn20(
corriged['note']['value'], corriged['note']['total']) > challenge.parcours.validate_condition
db.add(challenge)
db.commit()
db.refresh(challenge)
return challenge
# Dependencies
def check_room(room_id: str, db: Session = Depends(get_session)):
room = db.exec(select(Room).where(Room.id_code == room_id)).first()
return room
def get_room(room_id, db: Session = Depends(get_session)):
room = db.exec(select(Room).where(Room.id_code == room_id)).first()
if room is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Salle introuvable")
return room
def get_member_dep(room: Room = Depends(get_room), user: User = Depends(get_current_user_optional), clientId: str | None = Body(default=None), db: Session = Depends(get_session)):
if user is None and clientId is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
if user is not None:
member = get_member_from_user(user.id, room.id, db)
if clientId is not None:
member = get_member_from_clientId(clientId, room.id, db)
if member is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Vous n'êtes pas dans cette salle")
return member
def check_admin(member: Member = Depends(get_member_dep)):
if member.is_admin is False:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Vous devez être administrateur pour faire cela")
return member
def get_parcours(parcours_id: str, room: Room = Depends(get_room), db: Session = Depends(get_session)):
room = db.exec(select(Parcours).where(Parcours.id_code ==
parcours_id, Parcours.room_id == room.id)).first()
if room is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Parcours introuvable")
return room
def get_exercices(parcours: Parcours = Depends(get_parcours), db: Session = Depends(get_session)):
exercices = db.exec(select(Exercice).where(col(Exercice.id_code).in_(
[e['exercice_id'] for e in parcours.exercices]))).all()
return [{"exercice": e, "quantity": [q for q in parcours.exercices if q['exercice_id'] == e.id_code][0]['quantity']} for e in exercices]
def get_correction(correction_id: str, parcours_id: str, member: Member = Depends(get_member_dep), db: Session = Depends(get_session)):
tmpCorr = db.exec(select(TmpCorrection).where(
TmpCorrection.id_code == correction_id, TmpCorrection.parcours_id == parcours_id)).first()
if tmpCorr is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Correction introuvable")
if member != tmpCorr.member:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="It's not your challenge")
return tmpCorr
def get_challenge(challenge_id: str, parcours_id: str, db: Session = Depends(get_session)):
challenge = db.exec(select(Challenge).where(
Challenge.id_code == challenge_id, Challenge.parcours_id == parcours_id)).first()
if challenge is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Challenge introuvable")
if challenge.data == []:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Impossible de corriger ce challenge")
return challenge

View File

@ -1,8 +1,11 @@
from uuid import UUID, uuid4
from pydantic import root_validator, BaseModel
import pydantic.json
from typing import List, Optional, TYPE_CHECKING
from sqlmodel import SQLModel, Field, Relationship
from sqlmodel import SQLModel, Field, Relationship, JSON, Column
from database.auth.models import UserRead
if TYPE_CHECKING:
from database.auth.models import User
@ -10,6 +13,7 @@ if TYPE_CHECKING:
class RoomBase(SQLModel):
name: str = Field(max_length=20)
public: bool = Field(default=False)
global_results: bool = Field(default=False)
class RoomCreate(RoomBase):
pass
@ -19,8 +23,8 @@ class Room(RoomBase, table=True):
id_code: str = Field(index=True)
members: List['Member'] = Relationship(back_populates="room")
parcours: List['Parcours'] = Relationship(back_populates="room")
class AnonymousBase(SQLModel):
username: str = Field(max_length=20)
@ -32,11 +36,13 @@ class Anonymous(AnonymousBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
reconnect_code: str = Field(index=True)
clientId: Optional[UUID] = Field(default=uuid4(), index=True)
member: 'Member' = Relationship(back_populates="anonymous")
class Member(SQLModel, table = True):
id: Optional[int] = Field(default=None, primary_key=True)
id_code: str = Field(index=True)
user_id: Optional[int] = Field(foreign_key="user.id", default=None)
user: Optional["User"] = Relationship(back_populates='members')
@ -47,6 +53,8 @@ class Member(SQLModel, table = True):
room_id: int = Field(foreign_key="room.id")
room: Room = Relationship(back_populates='members')
challenges: List["Challenge"] = Relationship(back_populates="challenger")
is_admin: bool = False
waiting: bool = False
@ -54,21 +62,128 @@ class Member(SQLModel, table = True):
online: bool = False
waiter_code: Optional[str] = Field(default= None)
id_code: str
class RoomRead(RoomBase):
id_code: str
#members: List['Member']
corrections: List['TmpCorrection'] = Relationship(back_populates="member")
class ExercicesCreate(SQLModel):
exercice_id: str
quantity: int = 10
class Exercices(ExercicesCreate):
name: str
class Parcours(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
id_code: str = Field(index=True, unique=True)
room_id: int = Field(foreign_key="room.id")
room: Room = Relationship(back_populates='parcours')
name: str
time: int
validate_condition: int
exercices: List[Exercices] = Field(sa_column=Column(JSON))
challenges: List["Challenge"] = Relationship(back_populates="parcours")
corrections: List["TmpCorrection"] = Relationship(back_populates="parcours")
class Note(BaseModel):
value: int
total: int
class TimedNote(Note):
time: int
class ParcoursReadShort(SQLModel):
name: str
best_note: str | None = None
id_code: str
class Challenges(SQLModel):
id_code: str
challenger: str
note: Note
time: int
isCorriged: bool
canCorrige: bool
validated: bool
class ParcoursRead(SQLModel):
name: str
time: int
validate_condition: int
id_code: str
exercices: List[Exercices]
challenges: List[Challenges]
class ParcoursCreate(SQLModel):
name: str
time: int
validate_condition: int
exercices: List[ExercicesCreate]
class NotCorrigedInput(BaseModel):
index: int
value: str
class CorrigedInput(NotCorrigedInput):
correction: str
class ParsedGeneratorOut(BaseModel):
calcul: str
inputs: List[NotCorrigedInput]
class CorrigedGeneratorOut(BaseModel):
calcul: str
inputs: List[CorrigedInput]
class Challenge(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
id_code: str = Field(index=True, unique=True)
challenger_id: int = Field(foreign_key="member.id")
challenger: Member = Relationship(back_populates="challenges")
parcours_id: int = Field(foreign_key="parcours.id_code")
parcours: Parcours = Relationship(back_populates="challenges")
data: Optional[List[List[CorrigedGeneratorOut]]] = Field(sa_column=Column(JSON), default=[])
time: int
note: Note = Field(
sa_column=Column(JSON))
validated: bool
isCorriged: bool
class ChallengeRead(SQLModel):
id_code: str
data: Optional[List[List[CorrigedGeneratorOut]]] = []
time: int
note: Note
validated: bool
isCorriged: bool
class TmpCorrection(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
id_code: str = Field(index=True)
parcours_id: str = Field(foreign_key="parcours.id_code")
parcours: Parcours = Relationship(back_populates="corrections")
member_id: int = Field(foreign_key="member.id")
member: Member = Relationship(back_populates="corrections")
data: List[List[CorrigedGeneratorOut]] = Field(sa_column=Column(JSON))
class AnonymousRead(AnonymousBase):
reconnect_code: str
class Username(SQLModel):
username: str
class MemberWithRelations(SQLModel):
is_admin: bool
user: UserRead | None = None
anonymous: AnonymousRead | None = None
class MemberRead(SQLModel):
username: str
@ -76,26 +191,25 @@ class MemberRead(SQLModel):
isUser: bool
isAdmin: bool
id_code: str
class MemberSerializer(MemberRead):
member: MemberWithRelations
@root_validator
def parse_member(cls, values):
member = values.get('member')
if member == None:
return values
member_obj = member.user or member.anonymous
if member_obj is None:
raise ValueError('User or anonymous required')
return {"username": member_obj.username, "reconnect_code": getattr(member_obj, "reconnect_code", ""), "isAdmin": member.is_admin, "isUser": member.user != None}
class RoomRead(RoomBase):
id_code: str
class RoomAndMember(BaseModel):
room: RoomRead
member: MemberRead
class RoomInfo(RoomRead):
public: bool
name: str
members: List[MemberRead]
parcours: List[ParcoursReadShort]
class Waiter(BaseModel):
username: str
waiter_id: str
class RoomConnectionInfos(BaseModel):
room: str
member: str | None = None

View File

@ -10,7 +10,7 @@ from tortoise import fields
from tortoise.contrib.pydantic import pydantic_model_creator
import async_to_sync as sync
from tortoise.manager import Manager
from generateur.generateur_main import Generateur
from generateur.generateur_main import generate_from_path
from services.io import add_fast_api_root
@ -96,7 +96,7 @@ class Exercice(Model):
if not isinstance(self.exo_source, io.BytesIO) and not isinstance(self.exo_source, io.StringIO):
return {
"type": "Csv" if self.csvSupport() else "web" if self.webSupport() else None,
"data": Generateur(add_fast_api_root(self.exo_source), 3, "csv" if self.csvSupport() else "web" if self.pdfSupport() else None, True) if self.csvSupport() == True or self.webSupport() == True else None
"data": generate_from_path(add_fast_api_root(self.exo_source), 3, "csv" if self.csvSupport() else "web" if self.pdfSupport() else None, True) if self.csvSupport() == True or self.webSupport() == True else None
}
return {}

View File

@ -1,4 +1,4 @@
from .generateur_main import Generateur
from .generateur_main import generate_from_path
PAGE_LINES = {
10: 53,
12: 49,
@ -16,7 +16,7 @@ MAX_LENGTH = {
def Csv_generator(path, nb_in_serie, nb_page, police, consigne, writer):
exo_exemple = Generateur(path, 1, 'csv')
exo_exemple = generate_from_path(path, 1, 'csv')
if len(consigne) < MAX_LENGTH[police] and len(consigne) > len(exo_exemple):
longueur_max = len(consigne) + 5
elif len(consigne) > MAX_LENGTH[police] and len(consigne) > len(exo_exemple):
@ -61,7 +61,7 @@ def Csv_generator(path, nb_in_serie, nb_page, police, consigne, writer):
for k in range(nb_in_serie):
calcul_list = list(
map(lambda calc: calc['calcul'], Generateur(path, nb_in_line, 'csv')))
map(lambda calc: calc['calcul'], generate_from_path(path, nb_in_line, 'csv')))
n = 1
for i in range(n, len(calcul_list) + n + 1, n+1):
calcul_list.insert(i, '')

View File

@ -1,6 +1,41 @@
import random
import re
import importlib.util
import string
from typing import List
from pydantic import BaseModel
import sympy
class GeneratorOut(BaseModel):
calcul: str
correction: str | None = None
def parseOut(calcul):
"""Fait en sorte de séparer la correction présente dans le calcul"""
regex = r"\[(.*?)\]"
calculEx = calcul['calcul'].replace('[', ' [').replace(']', '] ')
splitted = calculEx.split()
if len(list(filter(lambda e: e.startswith("[") and e.endswith(']'), splitted))) == 0:
splitted.append('[]')
inputs = []
for i in range(len(splitted)):
c = splitted[i]
match = re.findall(regex, c)
if len(match) != 0:
correction = c[1:-1]
splitted[i] = f'[{len(inputs)}]'
inputs.append(
{'index': len(inputs), 'correction': correction, 'value': ""})
calculEx = ' '.join(splitted)
return {'calcul': calculEx, 'inputs': inputs}
def parseGeneratorOut(out: List[GeneratorOut]):
return [parseOut(c) for c in out]
def getObjectKey(obj, key):
if obj[key] == None:
@ -13,19 +48,17 @@ def getCorrectionKey(obj, key):
def parseCorrection(calc, replacer='...'):
exp_list = re.findall(r"\[([A-Za-z0-9_]+)\]", calc)
exp_list = re.findall(r"\[(.*?)\]", calc)
for exp in exp_list:
calc = calc.replace(f'[{exp}]', replacer)
return calc
def Generateur(path, quantity, key, forcedCorrection=False):
spec = importlib.util.spec_from_file_location(
"tmp", path)
tmp = importlib.util.module_from_spec(spec)
spec.loader.exec_module(tmp)
def generate_from_data(data, quantity, key, forcedCorrection=False):
locs = {}
exec(data, {"random": random, "string": string, "sympy": sympy}, locs)
try:
main_func = tmp.main
main_func = locs['main']
except:
return None
main_result = main_func()
@ -38,12 +71,18 @@ def Generateur(path, quantity, key, forcedCorrection=False):
correction_key = getCorrectionKey(result_object, key)
op_list = []
try:
replacer = tmp.CORRECTION_REPLACER
replacer = locs["CORRECTION_REPLACER"]
except:
replacer = '...'
for i in range(quantity):
main_result = main_func()
main = {**default_object, **main_result}
op_list.append({'calcul': parseCorrection(main[
object_key], replacer) if (forcedCorrection or (key != 'web' and main['correction'] == False)) else main[object_key], 'correction': main[correction_key]})
object_key], replacer) if (forcedCorrection or (key != 'web' and main['correction'] == False)) else main[object_key], "correction": main[correction_key]})
return op_list
def generate_from_path(path, quantity, key, forcedCorrection=False):
data = open(path, "r").read()
return generate_from_data(data, quantity, key, forcedCorrection)

View File

@ -1,5 +1,6 @@
#import schemas.base
from sqlmodel import SQLModel, Field
from services.database import generate_unique_code
from sqlmodel import SQLModel, Field, select
from services.password import get_password_hash
from sqlmodel import Session, select
from database.auth.crud import create_user_db
@ -7,8 +8,6 @@ from services.auth import get_current_user_optional, jwt_required
from fastapi.openapi.utils import get_openapi
from database.auth.models import User, UserRead
from database.exercices.models import Exercice, ExerciceRead
from database.room.models import Room, Anonymous, MemberWithRelations
import database.db
from fastapi_pagination import add_pagination
from fastapi.responses import PlainTextResponse
from fastapi.exceptions import RequestValidationError, ValidationError
@ -22,7 +21,7 @@ from fastapi import FastAPI, HTTPException, params
from tortoise import Tortoise
from fastapi.middleware.cors import CORSMiddleware
from tortoise.contrib.fastapi import register_tortoise
from pydantic import BaseModel
from pydantic import BaseModel, validator
from database.db import create_db_and_tables, get_session
from services.jwt import revoke_access, revoke_refresh
import routes.base
@ -54,12 +53,50 @@ admin = Admin(app, engine)
class UserAdmin(ModelView, model=User):
column_list = [User.id, User.username]
admin.add_view(UserAdmin)
class Id_codeField(str):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, value, values, config, field):
print("validator", cls, value, values, config, field)
return value
class Test(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
id_code: Id_codeField
''' @validator('id_code', always=True)
def test(cls,value, values):
print('VAlIDATE')
session= get_session()
session = next(session)
y = session.exec(select(cls)).all()
print(y)
code = generate_unique_code(cls, s=session)
print(code)
return code '''
@app.on_event("startup")
def on_startup():
create_db_and_tables()
def t(test_1: str):
return test_1 + "lol"
def t2(test_1: str, test_2: str):
return test_1 + test_2
def t3(test_1: str):
if test_1 == '3':
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="non")
@app.post('/test/{test_1}/{test_2}', dependencies=[Depends(t3)])
def test(test_1: str, test_2: str, test_3: str = Depends(t), test_4: str = Depends(t2)):
return {"t1": test_1, "t2": test_2, "t3": test_3, "t4": test_4}
@app.exception_handler(RequestValidationError)
@app.exception_handler(ValidationError)
@ -67,7 +104,8 @@ async def validation_exception_handler(request, exc: RequestValidationError|Vali
errors = {}
print(exc.errors())
for e in exc.errors():
errors[e['loc'][-1] + "_error"] = e['msg']
locs = [e for e in e['loc'] if type(e) == str]
errors[locs[-1] + "_error"] = e['msg']
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,

View File

@ -1,11 +1,12 @@
from enum import Enum
from typing import List
from fastapi import APIRouter, Depends, UploadFile, HTTPException, status
from fastapi import APIRouter, Depends, Path, Query, UploadFile, HTTPException, status
from database.auth.models import User
from database.db import get_session
from database.exercices.models import Exercice, ExerciceCreate, ExerciceEdit, ExerciceRead, ExercicesTagLink, Tag, TagCreate, TagRead
from services.auth import get_current_user, get_current_user_optional
from sqlmodel import Session, select, col
from database.exercices.crud import add_tags_db, check_exercice_author, check_tag_author, create_exo_db, delete_exo_db, get_exo_dependency, clone_exo_db, parse_exo_tags, remove_tag_db, serialize_exo, update_exo_db, get_tags_dependency
from database.exercices.crud import add_tags_db, check_exercice_author, check_private, check_tag_author, create_exo_db, delete_exo_db, get_exo_dependency, clone_exo_db, parse_exo_tags, remove_tag_db, serialize_exo, update_exo_db, get_tags_dependency
from services.exoValidation import validate_file, validate_file_optionnal
from services.io import add_fast_api_root, get_filename_from_path
from fastapi.responses import FileResponse
@ -13,27 +14,33 @@ from sqlmodel import func
router = APIRouter(tags=['exercices'])
class ExoType(str, Enum):
csv="csv"
pdf="pdf"
web="web"
def filter_exo_by_tags(exos: List[tuple[Exercice, str]], tags: List[Tag]):
valid_exos = [exo for exo, tag in exos if all(
str(t) in tag.split(',') for t in tags)]
return valid_exos
def queryFilters_dependency(search: str = "", tags: List[int] | None = Depends(get_tags_dependency)):
return search, tags
def queryFilters_dependency(search: str = "", tags: List[int] | None = Depends(get_tags_dependency), type: ExoType | None = Query(default = None)):
return search, tags, type
@router.post('/exercices', response_model=ExerciceRead, status_code=status.HTTP_201_CREATED)
def create_exo(exercice: ExerciceCreate = Depends(ExerciceCreate.as_form), file: UploadFile = Depends(validate_file), user: User = Depends(get_current_user), db: Session = Depends(get_session)):
file_obj = file.file._file
file_obj.name = file.filename
file_obj = file['file'].file._file
file_obj.name = file['file'].filename
exo_obj = create_exo_db(exercice=exercice, user=user,
exo_source=file_obj, db=db)
exo_source=file_obj, supports=file['supports'],db=db)
return serialize_exo(exo=exo_obj, user_id=user.id, db=db)
@router.post('/clone/{id_code}', response_model=ExerciceRead)
def clone_exo(exercice: Exercice | None = Depends(get_exo_dependency), user: User = Depends(get_current_user), db: Session = Depends(get_session)):
def clone_exo(exercice: Exercice | None = Depends(check_private), user: User = Depends(get_current_user), db: Session = Depends(get_session)):
if not exercice:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={
"Exercice introuvable"})
@ -47,7 +54,7 @@ def clone_exo(exercice: Exercice | None = Depends(get_exo_dependency), user: Use
@router.get('/exercices/user', response_model=List[ExerciceRead])
def get_user_exercices(user: User = Depends(get_current_user), queryFilters: tuple[str, List[int] | None] = Depends(queryFilters_dependency), db: Session = Depends(get_session)):
search, tags = queryFilters
search, tags, type = queryFilters
if tags is not None and len(tags) != 0:
statement = select(Exercice, func.group_concat(
@ -57,7 +64,14 @@ def get_user_exercices(user: User = Depends(get_current_user), queryFilters: tup
statement = statement.where(Exercice.author_id == user.id)
statement = statement.where(Exercice.name.startswith(search))
if type == ExoType.csv:
statement = statement.where(Exercice.csv == True)
if type == ExoType.pdf:
statement = statement.where(Exercice.pdf == True)
if type == ExoType.web:
statement = statement.where(Exercice.web == True)
if tags is not None and len(tags) != 0:
statement = statement.join(ExercicesTagLink).where(
col(ExercicesTagLink.tag_id).in_(tags)).group_by(ExercicesTagLink.exercice_id)
@ -73,7 +87,7 @@ def get_user_exercices(user: User = Depends(get_current_user), queryFilters: tup
@router.get('/exercices/public', response_model=List[ExerciceRead])
def get_public_exercices(user: User | None = Depends(get_current_user_optional), queryFilters: tuple[str, List[int] | None] = Depends(queryFilters_dependency), db: Session = Depends(get_session)):
search, tags = queryFilters
search, tags, type = queryFilters
if user is not None:
if tags is not None and len(tags) != 0:
@ -86,6 +100,13 @@ def get_public_exercices(user: User | None = Depends(get_current_user_optional),
statement = statement.where(Exercice.private == False)
statement = statement.where(Exercice.name.startswith(search))
if type == ExoType.csv:
statement = statement.where(Exercice.csv == True)
if type == ExoType.pdf:
statement = statement.where(Exercice.pdf == True)
if type == ExoType.web:
statement = statement.where(Exercice.web == True)
if tags is not None and len(tags) != 0:
statement = statement.join(ExercicesTagLink).where(
col(ExercicesTagLink.tag_id).in_(tags)).group_by(ExercicesTagLink.exercice_id)
@ -102,12 +123,18 @@ def get_public_exercices(user: User | None = Depends(get_current_user_optional),
statement = select(Exercice)
statement = statement.where(Exercice.private == False)
statement = statement.where(Exercice.name.startswith(search))
if type == ExoType.csv:
statement = statement.where(Exercice.csv == True)
if type == ExoType.pdf:
statement = statement.where(Exercice.pdf == True)
if type == ExoType.web:
statement = statement.where(Exercice.web == True)
exercices = db.exec(statement).all()
return [serialize_exo(exo=e, user_id=None, db=db) for e in exercices]
@router.get('/exercice/{id_code}', response_model=ExerciceRead)
def get_exercice(exo: Exercice = Depends(get_exo_dependency), user: User | None = Depends(get_current_user_optional), db: Session = Depends(get_session)):
def get_exercice(exo: Exercice = Depends(check_private), user: User | None = Depends(get_current_user_optional), db: Session = Depends(get_session)):
return serialize_exo(exo=exo, user_id=getattr(user, 'id', None), db=db)
@ -122,9 +149,9 @@ def update_exo(file: UploadFile = Depends(validate_file_optionnal), exo: Exercic
file_obj = None
if file:
file_obj = file.file._file
file_obj.name = file.filename
exo_obj = update_exo_db(exo, exercice, file_obj, db)
file_obj = file["file"].file._file
file_obj.name = file['file'].filename
exo_obj = update_exo_db(exo, exercice,file['supports'] if file is not None else None, file_obj, db)
return serialize_exo(exo=exo_obj, user_id=exo_obj.author_id, db=db)

View File

@ -1,247 +0,0 @@
from pydantic import BaseModel
import csv
import io
from typing import List
from fastapi.exceptions import HTTPException
from fastapi.responses import FileResponse, StreamingResponse
from fastapi import APIRouter, Depends, UploadFile, status, Query
from services.database import get_exo
from services.io import add_fast_api_root, get_filename_from_path
from database.exercices.crud import add_tag_db, clone_exo_db, create_exo_db, delete_exo_db, get_exo_source_path, update_exo_db
from services.exoValidation import validate_file
from database.auth.models import User
from services.auth import check_author_exo, get_current_user, get_current_user_optional
from database.exercices.models import Exercice, Tag
from fastapi_jwt_auth import AuthJWT
from schemas.exercices import ExerciceIn_form, ExerciceSchema, Exercices_schema, Exercice_schema, Exercices_withoutTags, Tag_schema, TagFull_schema, TagIn
from fastapi_pagination import paginate, Page
from generateur.generateur_csv import Csv_generator
router = APIRouter(tags=['exercices'])
def get_exercice_data(exo: Exercice, user: User | None = None):
return {}
# Exercices
class Exo(BaseModel):
name: str
id_code: str
tags: List[TagFull_schema]
async def get_tags(tags: List[str] | None=Query(None)):
if tags is None:
return None
validated_tags = []
for t in tags:
tag = await Tag.get_or_none(id_code=t)
if tag is not None:
validated_tags.append(tag)
return validated_tags
async def get_exo_by_tags(exos: Query, tags: List[Tag]):
valid_exos = []
for e in exos:
exo_tags = await e.tags.all()
if (all(t in exo_tags for t in tags)):
valid_exos.append(e)
return valid_exos
@router.get("/exercices", response_model=Page[Exo])
async def get_exercices(search: str = "", tags: List[Tag] | None = Depends(get_tags)):
exos = Exercice.all()
print(Exo.schema_json(indent=4))
print(Exercice_schema.schema_json(indent=4))
if tags != None:
exos = await exos.filter(tags__id_code__in=[t.id_code for t in tags]).distinct()
exos = await get_exo_by_tags(exos, tags)
print(exos, [e.id_code for e in exos])
exos = Exercice.filter(id_code__in = [e.id_code for e in exos])
print( await exos)
exos = exos.filter(name__icontains= search)
exo_list = await Exercices_schema.from_queryset(exos)
return paginate(exo_list)
async def get_exo_with_user_tags(exo: Exercice, user: User) -> Exercices_schema:
exo_data = await Exercice_schema.from_tortoise_orm(exo)
exo_tags = await exo.tags.all()
exo_data = {**exo_data.dict(), 'tags': await TagFull_schema.from_queryset(exo.tags.filter(owner_id=user.id))}
return exo_data
@router.get('/exercices/user', response_model=Page[Exercices_schema])
async def get_user_exercices(user: User = Depends(get_current_user), search: str = "", tags: List[Tag] | None = Depends(get_tags)):
exos = Exercice.filter(author_id=user.id)
print('tatgs', tags)
if tags != None:
print('lolilol')
exos = await exos.filter(tags__id_code__in=[t.id_code for t in tags]).distinct()
exos = await get_exo_by_tags(exos, tags)
exos = Exercice.filter(id_code__in=[e.id_code for e in exos])
exos = await exos.filter(name__icontains=search)
exo_list = [await get_exo_with_user_tags(e, user) for e in exos]
print(len(exo_list))
return paginate(exo_list)
def exclude_dict_key(dict, key):
return {k: v for k, v in dict.items() if k != key}
@router.get('/exercices/public', response_model=Page[Exercices_schema | Exercices_withoutTags])
async def get_public_exercices(user: User | None = Depends(get_current_user_optional), search: str = "", tags: List[Tag] | None = Depends(get_tags)):
is_authenticated = user != None
if is_authenticated:
exos = Exercice.filter(author_id__not=user.id, private = False, isOriginal=True)
if tags != None:
exos = await exos.filter(tags__id_code__in=[t.id_code for t in tags]).distinct()
exos = await get_exo_by_tags(exos, tags)
exos = Exercice.filter(id_code__in=[e.id_code for e in exos])
else:
exos = Exercice.filter(private=False, isOriginal=True)
exos = await exos.filter(name__icontains=search)
if is_authenticated:
exo_list = [await get_exo_with_user_tags(e, user) for e in exos]
else:
exo_list = await Exercices_withoutTags.from_queryset(Exercice.all())
return paginate(exo_list)
async def get_exo_tags(exo: Exercice, user: User) -> Exercice_schema:
exo_data = await Exercice_schema.from_tortoise_orm(exo)
exo_tags = await exo.tags.all()
exo_data = {**exo_data.dict(), 'tags': await TagFull_schema.from_queryset(exo.tags.filter(owner_id=user.id))}
return exo_data
@router.get('/exercice/{id_code}', response_model=ExerciceSchema)
async def get_exercice(id_code: str, user: User = Depends(get_current_user_optional)):
is_authenticated = user != None
print(TagFull_schema.schema_json(indent=4), '\n\n', ExerciceSchema.schema_json(indent=4))
exo = await Exercice.get_or_none(id_code=id_code)
if exo is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="exercice not found")
if is_authenticated:
author = await exo.author
is_author = author.id == user.id
exo_obj = await Exercice_schema.from_tortoise_orm(exo)
exo_obj = await get_exo_tags(exo, user)
print(exo_obj)
exo_dict = exo_obj
exo_dict['is_author'] = is_author
exo_dict['exo_source_name'] = get_filename_from_path(exo.exo_source)
return exo_dict
exo_obj = await Exercice_schema.from_tortoise_orm(exo)
exo_dict = exo_obj.dict()
exo_dict['is_author'] = False
exo_dict['exo_source_name'] = get_filename_from_path(exo.exo_source)
exo_dict['tags'] = []
return exo_dict
@router.post("/exercices", response_model=Exercice_schema)
async def create_exercice(file: UploadFile = Depends(validate_file), exo: ExerciceIn_form = Depends(ExerciceIn_form.as_form), user: User = Depends(get_current_user)):
file_obj = file.file._file
file_obj.name = file.filename
exo_obj = await create_exo_db(**{**exo.dict(exclude_unset=True)}, exo_source=file_obj, author_id=user.id)
return await Exercice_schema.from_tortoise_orm(exo_obj)
@router.delete("/exercice/{id_code}")
async def delete_exercice(id_code: str, author: User = Depends(check_author_exo)):
await delete_exo_db(id_code)
return {'detail': "Exercice successfully deleted"}
@router.put("/exercice/{id_code}", response_model=Exercice_schema)
async def update_exercice(id_code: str, file: UploadFile = None, exo: ExerciceIn_form = Depends(ExerciceIn_form.as_form), author: User = Depends(check_author_exo)):
file_obj = None
if file != None:
file_obj = file.file._file
file_obj.name = file.filename
exo_obj = await update_exo_db(id_code, exo_source=file_obj, ** {**exo.dict(exclude_unset=True)})
return await Exercice_schema.from_tortoise_orm(exo_obj)
@router.post('/exercices/{id_code}/clone', response_model=Exercice_schema)
async def clone_exercice(id_code: str, user: User = Depends(get_current_user)):
exo_obj = await clone_exo_db(id_code, user.id)
return await Exercice_schema.from_tortoise_orm(exo_obj)
@router.get('/exercices/{id_code}/exo_source')
async def get_exo_source(id_code: str, author: User = Depends(check_author_exo)):
path = await get_exo_source_path(id_code)
filename = get_filename_from_path(path)
return FileResponse(path, headers={'content-disposition': 'attachment;filename='+filename})
# Tags
@router.get('/tags')
async def get_tags(user: User = Depends(get_current_user)):
return await Tag_schema.from_queryset(user.tags.all())
@router.post('/tags/{id_code}')
async def add_tags(id_code: str, tags: List[TagIn], user: User = Depends(get_current_user)):
exo = await Exercice.get(id_code = id_code)
exercice = await add_tag_db(exo, tags, user.id)
return {'exo': await get_exo_with_user_tags(exo, user), 'tags': await TagFull_schema.from_queryset(user.tags.all())}
async def check_tag_owner(tag_id: str, user: User):
tag = await Tag.get(id_code=tag_id)
owner = await tag.owner
if owner.id != user.id:
raise HTTPException(status_code = status.HTTP_401_UNAUTHORIZED, detail="Vous n'êtes pas le créateur du tag")
return tag
@router.delete('/tags/{id_code}/{tag_id}')
async def delete_tag(id_code:str,tag_id: str, user: User = Depends(get_current_user)):
tag = await check_tag_owner(tag_id, user)
exo = await Exercice.get(id_code=id_code)
await exo.tags.remove(tag)
return await get_exo_with_user_tags(exo, user)
# Generation
@router.get('/generator/csv/{exo_id}')
async def generate_csv(exo_id: str, filename: str):
exo = await Exercice.get(id_code=exo_id)
if exo.csvSupport == False:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail='Impossible de générer cet exercice sur le support csv')
source_path = add_fast_api_root(exo.exo_source)
consigne = exo.consigne
buffer = io.StringIO()
writer = csv.writer(buffer, delimiter=',',
quotechar=',', quoting=csv.QUOTE_MINIMAL, dialect='excel') # mettre | comme sep un jour
Csv_generator(source_path, 10, 10, 12, consigne, writer)
return StreamingResponse(iter([buffer.getvalue()]), headers={"Content-Disposition": f'attachment;filename="{filename}'}, media_type='text/csv')

View File

@ -3,7 +3,7 @@ from services.websocket import Consumer
from typing import Any, TYPE_CHECKING
from database.room.models import Room, Member, MemberRead, Waiter
from sqlmodel import Session
from database.room.crud import serialize_member,check_user_in_room, create_anonymous, create_member, create_room_db, delete_member, get_member, get_member_from_token, get_member_from_reconnect_code, connect_member, disconnect_member, create_anonymous_member, create_anonymous_waiter, create_user_member, create_user_waiter, get_or_create_member, get_waiter, accept_waiter, leave_room, refuse_waiter, check_room
from database.room.crud import change_room_name, change_room_status, serialize_member,check_user_in_room, create_anonymous, create_member, get_member, get_member_from_token, get_member_from_reconnect_code, connect_member, disconnect_member, create_anonymous_member, create_anonymous_waiter, create_user_member, create_user_waiter, get_or_create_member, get_waiter, accept_waiter, leave_room, refuse_waiter, check_room
from database.auth.crud import get_user_from_token
if TYPE_CHECKING:
from routes.room.routes import RoomManager
@ -26,16 +26,16 @@ class RoomConsumer(Consumer):
await self.ws.send_json({'type': type, "data": payload})
async def send_to_admin(self, type: str, payload: Any, exclude: bool = False):
await self.manager.send_to_admin(self.room.id, {'type': type, "data": payload})
await self.manager.send_to_admin(self.room.id_code, {'type': type, "data": payload})
async def send_to(self, type: str, payload: Any, member_id, exclude: bool = False):
await self.manager.send_to(self.room.id, member_id, {'type': type, "data": payload})
await self.manager.send_to(self.room.id_code, member_id, {'type': type, "data": payload})
async def broadcast(self, type, payload, exclude=False):
await self.manager.broadcast({"type": type, "data": payload}, self.room.id, exclude=[exclude == True and self])
await self.manager.broadcast({"type": type, "data": payload}, self.room.id_code, exclude=[exclude == True and self])
def add_to_group(self):
self.manager.add(self.room.id, self)
self.manager.add(self.room.id_code, self)
async def connect_self(self):
if isinstance(self.member, Member):
@ -55,7 +55,8 @@ class RoomConsumer(Consumer):
self.member = member
await self.connect_self()
self.add_to_group()
await self.direct_send(type="loggedIn", payload={"member": serialize_member(self.member)})
clientId = self.member.anonymous.clientId if self.member.anonymous is not None else ""
await self.direct_send(type="loggedIn", payload={"member": {**serialize_member(self.member), 'clientId': str(clientId)}})
async def send_error(self, msg):
await self.direct_send(type="error", payload={"msg": msg})
@ -79,6 +80,7 @@ class RoomConsumer(Consumer):
def isWaiter(self):
return self.member is not None and self.member.waiting == True
# Received Events
@Consumer.event('login')
async def login(self, token: str | None = None, reconnect_code: str | None = None):
if reconnect_code is None and token is None:
@ -164,6 +166,31 @@ class RoomConsumer(Consumer):
@Consumer.event('ping_room')
async def proom(self):
await self.broadcast(type='ping', payload={}, exclude=True)
@Consumer.event('sub_parcours')
async def sub_parcours(self, parcours_id: str):
if isinstance(self.member, Member) and self.member.waiting == False:
self.manager.add(parcours_id, self)
@Consumer.event('unsub_parcours')
async def unsub_parcours(self, parcours_id: str):
if isinstance(self.member, Member) and self.member.waiting == False:
self.manager.remove(parcours_id, self)
@Consumer.event('set_name', conditions=[isAdminReceive])
async def change_name(self, name: str):
if len(name) < 20:
self.room = change_room_name(self.room,name, self.db)
print('SENDING')
await self.broadcast(type="new_name", payload={"name": name})
return
await self.send_error('Nom trop long (max 20 character)')
@Consumer.event('set_visibility', conditions=[isAdminReceive])
async def change_name(self, public: bool):
self.room = change_room_status(self.room, public, self.db)
await self.broadcast(type="new_visibility", payload={"public": public})
async def isConnected(self):
if self.member is None:

View File

@ -1,7 +1,9 @@
from typing import TYPE_CHECKING, Dict, List
from starlette.websockets import WebSocketState
if TYPE_CHECKING:
from routes.room.consumer import RoomConsumer
class RoomManager:
def __init__(self):
self.active_connections: Dict[str, List["RoomConsumer"]] = {}
@ -13,27 +15,35 @@ class RoomManager:
if member not in self.active_connections[group]:
self.active_connections[group].append(member)
async def _send(self, connection: "RoomConsumer", message, group: str):
if connection.ws.application_state == WebSocketState.DISCONNECTED:
self.remove(group, connection)
elif connection.ws.application_state == WebSocketState.CONNECTED:
await connection.send(message)
def remove(self, group: str, member: "RoomConsumer"):
if group in self.active_connections:
if member in self.active_connections[group]:
self.active_connections[group].remove(member)
async def broadcast(self, message, group: str, exclude: list["RoomConsumer"] = []):
print('BROADCaST', message)
if group in self.active_connections:
for connection in list(set(self.active_connections[group])):
# print(connection)
if connection not in exclude:
await connection.send(message)
await self._send(connection, message, group)
async def send_to(self, group, id_code, msg):
if group in self.active_connections:
members = [c for c in self.active_connections[group]
if c.member.id_code == id_code]
for m in members:
await m.send(msg)
await self._send(m, msg, group)
async def send_to_admin(self, group, msg):
if group in self.active_connections:
members = [c for c in self.active_connections[group]
if c.member.is_admin == True]
for m in members:
await m.send(msg)
await self._send(m, msg, group)

View File

@ -1,37 +1,138 @@
from database.room.crud import serialize_member,check_user_in_room, create_anonymous, create_member, create_room_db, delete_member, get_member, get_member_from_token, get_member_from_reconnect_code, connect_member, disconnect_member, create_anonymous_member, create_anonymous_waiter, create_user_member, create_user_waiter, get_or_create_member, get_waiter, accept_waiter, leave_room, refuse_waiter, check_room
from services.database import generate_unique_code
from services.io import add_fast_api_root
from generateur.generateur_main import generate_from_path, parseGeneratorOut, parseOut
from database.exercices.models import Exercice
from database.room.crud import CorrigedChallenge, change_correction, corrige_challenge, create_parcours_db, delete_parcours_db, create_room_db, get_member_dep, check_room, serialize_room, update_parcours_db, get_parcours, get_room, check_admin, get_exercices, get_challenge, get_correction, create_tmp_correction, create_challenge, change_challenge
from pydantic import BaseModel
from typing import Any, Callable, Dict, List, Optional
from fastapi import APIRouter, Depends, WebSocket, status, Query
from fastapi import APIRouter, Depends, WebSocket, status, Query, Body
from config import ALGORITHM, SECRET_KEY
from database.auth.crud import get_user_from_clientId_db
from database.auth.models import User
from database.db import get_session
from sqlmodel import Session
from database.room.models import Room, RoomCreate, RoomAndMember
from sqlmodel import Session, col, select
from database.room.models import Challenge, ChallengeRead, Challenges, CorrigedGeneratorOut, Member, Note, Parcours, ParcoursCreate, ParcoursRead, ParcoursReadShort, ParsedGeneratorOut, Room, RoomConnectionInfos, RoomCreate, RoomAndMember, RoomInfo, TmpCorrection
from routes.room.consumer import RoomConsumer
from routes.room.manager import RoomManager
from services.auth import get_current_user_optional
from fastapi.exceptions import HTTPException
from database.auth.crud import get_user_from_token
from services.websocket import Consumer
from services.misc import noteOn20, stripKeyDict
router = APIRouter(tags=["room"])
@router.post('/room', response_model=RoomAndMember)
def create_room(room: RoomCreate, username: Optional[str] = Query(default=None, max_length=20), user: User | None = Depends(get_current_user_optional), db: Session = Depends(get_session)):
room_obj = create_room_db(room=room, user=user, username=username, db=db)
return {'room': room_obj['room'], "member": serialize_member(room_obj['member'])}
manager = RoomManager()
def get_manager():
return manager
@router.post('/room', response_model=RoomConnectionInfos)
def create_room(room: RoomCreate, username: Optional[str] = Query(default=None, max_length=20), user: User | None = Depends(get_current_user_optional), db: Session = Depends(get_session)):
room_obj = create_room_db(room=room, user=user, username=username, db=db)
return {'room': room_obj['room'].id_code, "member": getattr(room_obj['member'].anonymous, "reconnect_code", None)}
@router.get('/room/{room_id}', response_model=RoomInfo)
def get_room_route(room: Room = Depends(get_room), member: Member = Depends(get_member_dep), db: Session = Depends(get_session)):
return serialize_room(room, member, db)
@router.post('/room/{room_id}/parcours', response_model=ParcoursRead)
async def create_parcours(*, parcours: ParcoursCreate, room_id: str, member: Member = Depends(check_admin), m: RoomManager = Depends(get_manager), db: Session = Depends(get_session)):
parcours_obj = create_parcours_db(parcours, member.room_id, db)
if type(parcours_obj) == str:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=parcours_obj)
await m.broadcast({"type": "add_parcours", "data": {"parcours": ParcoursReadShort(**parcours_obj.dict(exclude_unset=True)).dict()}}, room_id)
return parcours_obj
@router.get('/room/{room_id}/parcours/{parcours_id}', response_model=ParcoursRead)
async def get_parcours_route(*, parcours: Parcours = Depends(get_parcours), member: Member = Depends(get_member_dep), db: Session = Depends(get_session)):
if member.is_admin == False:
return {**parcours.dict(), "challenges": [Challenges(**{**chall.dict(), "challenger": member.id_code, "canCorrige": chall.data != []}) for chall in parcours.challenges if chall.challenger.id_code == member.id_code]}
if member.is_admin == True:
return {**parcours.dict(), "challenges": [Challenges(**{**chall.dict(), "challenger": member.id_code, "canCorrige": chall.data != []}) for chall in parcours.challenges]}
@router.put('/room/{room_id}/parcours/{parcours_id}', response_model=ParcoursRead, dependencies=[Depends(check_admin)])
async def update_parcours(*, room_id: str, parcours: ParcoursCreate, parcours_old: Parcours = Depends(get_parcours), m: RoomManager = Depends(get_manager), db: Session = Depends(get_session)):
parcours_obj = update_parcours_db(parcours, parcours_old, db)
if type(parcours_obj) == str:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=parcours_obj)
short = ParcoursReadShort(
**parcours_obj.dict(exclude_unset=True), id_code=parcours_obj.id_code)
await m.broadcast({"type": "update_parcours", "data": {"parcours": short.dict()}}, room_id)
return parcours_obj
@router.delete('/room/{room_id}/parcours/{parcours_id}', dependencies=[Depends(check_admin)])
async def delete_parcours(parcours: Parcours = Depends(get_parcours), m: RoomManager = Depends(get_manager), db: Session = Depends(get_session)):
delete_parcours_db(parcours, db)
await m.broadcast({"type": "del_parcours", "data": {"parcours_id": parcours.id_code}}, parcours.room.id_code)
return "ok"
class Exos(BaseModel):
exercice: Exercice
quantity: int
@router.get('/room/{room_id}/challenge/{parcours_id}')
def challenge_route(parcours_id: str, exercices: List[Exos] = Depends(get_exercices), member: Member = Depends(get_member_dep), db: Session = Depends(get_session)):
correction = [parseGeneratorOut(generate_from_path(add_fast_api_root(
e['exercice'].exo_source), e['quantity'], "web")) for e in exercices]
sending = [[{**c, 'inputs': [stripKeyDict(i, "correction")
for i in c['inputs']]} for c in e] for e in correction]
tmpCorr = create_tmp_correction(correction, parcours_id, member, db)
return {'challenge': sending, "id_code": tmpCorr.id_code}
@router.post('/room/{room_id}/challenge/{parcours_id}/{correction_id}', response_model=ChallengeRead)
async def send_challenge(*, challenge: List[List[ParsedGeneratorOut]], correction: TmpCorrection = Depends(get_correction), time: int = Body(), db: Session = Depends(get_session), m: RoomManager = Depends(get_manager),):
parcours = correction.parcours
member = correction.member
data = corrige_challenge(challenge, correction)
if data is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail={"challenge_error":"Object does not correspond to correction"})
chall = create_challenge(**data, challenger=member,
parcours=parcours, time=time, db=db)
await m.broadcast({"type": "challenge", "data": Challenges(**{**chall.dict(), "challenger": member.id_code, "canCorrige": chall.data != []}).dict()}, parcours.id_code)
db.delete(correction)
db.commit()
return chall
@router.get('/room/{room_id}/challenge/{parcours_id}/{challenge_id}', response_model=ChallengeRead, dependencies=[Depends(get_member_dep)])
async def challenge_read(*, challenge: Challenge = Depends(get_challenge)):
return challenge
@router.put('/room/{room_id}/challenge/{parcours_id}/{challenge_id}', response_model=ChallengeRead, dependencies=[Depends(check_admin)])
async def corrige(*, correction: List[List[CorrigedGeneratorOut]], challenge: Challenge = Depends(get_challenge), db: Session = Depends(get_session), m: RoomManager = Depends(get_manager),):
parcours = challenge.parcours
member = challenge.challenger
data = change_correction(correction, challenge)
if data is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail={"correction_error": "Object does not correspond to challenge"})
challenge = change_challenge(challenge, data, db)
await m.broadcast({"type": "challenge_change", "data": Challenges(**{**challenge.dict(), "challenger": member.id_code, "canCorrige": challenge.data != []}).dict()}, parcours.id_code)
return challenge
@router.websocket('/ws/room/{room_id}')
async def room_ws(ws: WebSocket, room: Room | None = Depends(check_room), db: Session = Depends(get_session)):
async def room_ws(ws: WebSocket, room: Room | None = Depends(check_room), db: Session = Depends(get_session), m: RoomManager = Depends(get_manager)):
if room is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail='Room not found')
consumer = RoomConsumer(ws=ws, room=room, manager=manager, db=db)
consumer = RoomConsumer(ws=ws, room=room, manager=m, db=db)
await consumer.run()

View File

@ -1,9 +1,10 @@
import random
import string
from sqlmodel import select, Session
from sqlmodel import select, Session, Field
from sqlmodel import SQLModel
from typing import Optional
from pydantic import validator
from database.db import get_session
def generate_unique_code(model: SQLModel, s: Session, field_name='id_code', length: int = 6):
if getattr(model, field_name, None) is None:
raise KeyError("Invalid field name")
@ -15,3 +16,15 @@ def generate_unique_code(model: SQLModel, s: Session, field_name='id_code', len
break
return code
class SQLModelWithIdCode(SQLModel):
id: Optional[int] = Field(default=None, primary_key=True)
id_code: Optional[str] = Field(default=None, unique=True, index=True)
@validator('id_code', always=True)
def generate_id_code(cls, value, values):
if value is None:
sessions = get_session()
session = next(sessions)
code = generate_unique_code(cls, s=session)
return code
return value

View File

@ -1,13 +1,19 @@
import ast
from multiprocessing import Manager
import random
import re
import string
from fastapi.exceptions import HTTPException
from fastapi import UploadFile, status
import sympy
from tortoise.validators import Validator
from tortoise.exceptions import ValidationError
import types
from services.timeout import timeout
from services.timeout import Process
from services.io import is_binary_file
import os
VALIDATED_MODULES = ["random", "string", "sympy"]
def checkExoSupportCompatibility(obj):
isPdf = False if (obj['pdf'] == None or (
obj['calcul'] == False and obj['pdf'] == False)) else True
@ -25,9 +31,9 @@ def checkExoSupportCompatibility(obj):
def get_module_from_string(value: str, *args, **kwargs):
locs = {}
try:
exec(value, dict(), locs)
exec(value, {"random": random, "string": string, "sympy": sympy}, locs)
except Exception as err:
raise ValueError(err)
raise ValueError(err.args[0])
return locs
@ -42,8 +48,8 @@ def execute_main_if_avalaible(spec, *args, **kwargs):
def get_spec_with_timeout(data, time):
return get_module_from_string(data)
with timeout(time, ValidationError('[Error] : Script took too long')):
return get_module_from_string(data)
''' with timeout(time, ValidationError('[Error] : Script took too long')):
return get_module_from_string(data) '''
def fill_empty_values(object):
@ -52,14 +58,34 @@ def fill_empty_values(object):
return {**default_object, **object}
def get_support_from_data(data: str):
locs = get_spec_with_timeout(data, 5)
result = execute_main_if_avalaible(locs)
result = fill_empty_values(result)
def get_results(data, resultObj):
locs = get_spec_with_timeout(data, 5)
result = execute_main_if_avalaible(locs)
result = fill_empty_values(result)
resultObj['result'] = result
def get_result_with_timeout(data):
m = Manager()
result = m.dict()
p = Process(target=get_results, args=(data, result))
p.start()
p.join(timeout=3)
p.terminate()
if p.exception:
error, traceback = p.exception
raise ValueError(error.args[0])
return result.get('result', None)
exo_supports_compatibility = checkExoSupportCompatibility(result)
return exo_supports_compatibility
def get_support_from_data(data: str):
result = get_result_with_timeout(data)
if result is None:
raise ValueError('Script took too long')
exo_supports_compatibility = checkExoSupportCompatibility(result)
return exo_supports_compatibility
def get_support_from_path(path: str):
@ -77,8 +103,40 @@ def get_support_from_path(path: str):
return get_support_from_data(data)
def parseImports(m):
if isinstance(m, ast.Import):
m.names = [n for n in m.names if n.name in VALIDATED_MODULES]
if len(m.names) == 0:
return None
return m
if isinstance(m, ast.ImportFrom):
if m.module not in VALIDATED_MODULES:
return None
return m
return m
def parseCode(code):
code = re.sub(
r"__(.*?)__",
"",
code
)
code.replace("open", "")
code.replace("globals", "")
return code
def parseFile(data: str):
parsed = ast.parse(data).body
parsed = [parseImports(m) for m in parsed]
parsed = [p for p in parsed if p is not None]
unparsed = ast.unparse(parsed)
return parseCode(unparsed)
async def validate_file(file: UploadFile):
data = await file.read()
data = parseFile(data)
try:
exo_supports_compatibility = get_support_from_data(
data)
@ -91,7 +149,7 @@ async def validate_file(file: UploadFile):
except HTTPException as e:
raise e
await file.seek(0)
return file
return {"file": file, "supports": exo_supports_compatibility}
async def validate_file_optionnal(file: UploadFile = None):
if not file:

View File

@ -0,0 +1,9 @@
from typing import Dict
def noteOn20(note: int, total: int):
return note * 20/total
def stripKeyDict(dict: Dict, key: str):
return {k: v for k, v in dict.items() if k != key}

View File

@ -1,21 +1,26 @@
from contextlib import contextmanager
import signal
def raise_timeout(signum, frame):
raise TimeoutError
@contextmanager
def timeout(time:int, exception = TimeoutError):
# Register a function to raise a TimeoutError on the signal.
signal.signal(signal.SIGALRM, raise_timeout)
# Schedule the signal to be sent after ``time``.
signal.alarm(time)
import multiprocessing as mp
import traceback
try:
yield
except TimeoutError:
print('TIMED OUT')
raise exception
finally:
# Unregister the signal so it won't be triggered
# if the timeout is not reached.
signal.signal(signal.SIGALRM, signal.SIG_IGN)
class Process(mp.Process):
def __init__(self, *args, **kwargs):
mp.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = mp.Pipe()
self._exception = None
def run(self):
try:
mp.Process.run(self)
self._cconn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
# raise e # You can still rise this exception if you need to
@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception

View File

@ -69,7 +69,7 @@ class Consumer:
async def send(self, payload):
type = payload.get('type', None)
print('TYPE', type, self.member)
#print('TYPE', type, self.member)
if type is not None:
event_wrapper = self.sendings.get(type, None)
if event_wrapper is not None:
@ -85,7 +85,6 @@ class Consumer:
try:
validated_payload = model(self=self, **data)
except ValidationError as e:
print("ERROR", e)
await self.ws.send_json({"type": "error", "data": {"msg": "Oops there was an error"}})
return

View File

@ -1,14 +1,4 @@
from pydantic import *
class Model(BaseModel):
test: str = "test"
i: int
d = {'title': 'Sendtest', 'type': 'object', 'properties': {'i': {
'title': 'I', 'type': 'integer'}}, 'required': ['i'], 'additionalProperties': False}
props = {(k) for k,v in d['properties']}
obj = create_model("model", foo=(str, "str"))
print(Model(i=12, l=12))
t = "test = 12"
locs ={}
exec(t, globals(), locs)
print(locs)

View File

@ -19,11 +19,41 @@ def test_create(client: TestClient, name="test_exo", consigne="consigne", privat
assert r.status_code == 201
assert 'id_code' in r.json()
assert {**r.json(), 'id_code': None} == {'name': name, 'consigne': consigne, 'private': private, 'id_code': None, 'author': {'username': username}, 'original': None, 'tags': [], 'exo_source': 'test.py', 'supports': {
'pdf': True, 'csv': True, 'web': False}, 'examples': {'type': 'csv', 'data': [{'calcul': 'None ++', 'correction': 'None ++'}, {'calcul': 'None ++', 'correction': 'None ++'}, {'calcul': 'None ++', 'correction': 'None ++'}]}, 'is_author': True}
'pdf': False, 'csv': True, 'web': True}, 'examples': {'type': 'csv', 'data': [{'calcul': '1 + ... = 2', "correction": "1 + [1] = 2"}, {'calcul': '1 + ... = 2', "correction": "1 + [1] = 2"}, {'calcul': '1 + ... = 2', "correction": "1 + [1] = 2"}]}, 'is_author': True}
return r.json()
def test_create_too_long(client: TestClient):
def test_create_bad_import(client: TestClient, name="test_exo", consigne="consigne", private=False, user=None):
if user == None:
token = test_register(client, username="lilian")['access']
username = 'lilian'
else:
token = user['token']
username = user['username']
r = client.post('/exercices', data={"name": name, "consigne": consigne, "private": private}, files={
'file': ('test.py', open('tests/testing_exo_source/exo_source_bad_import.py', 'rb'))}, headers={"Authorization": "Bearer " + token})
print(r.json())
assert r.status_code == 422
return r.json()
def test_create_bad_use__(client: TestClient, name="test_exo", consigne="consigne", private=False, user=None):
if user == None:
token = test_register(client, username="lilian")['access']
username = 'lilian'
else:
token = user['token']
username = user['username']
r = client.post('/exercices', data={"name": name, "consigne": consigne, "private": private}, files={
'file': ('test.py', open('tests/testing_exo_source/exo_source_use__.py', 'rb'))}, headers={"Authorization": "Bearer " + token})
print(r.json())
assert r.status_code == 422
return r.json()
def test_create_invalid(client: TestClient):
token = test_register(client)['access']
@ -35,6 +65,17 @@ def test_create_too_long(client: TestClient):
'consigne_error': 'ensure this value has at most 200 characters'}
def test_create_too_long(client: TestClient):
token = test_register(client)['access']
r = client.post('/exercices', data={"name": "e", "consigne": "e", "private": False}, files={
'file': ('test.py', open('tests/testing_exo_source/exo_source_infinite.py', 'rb'))}, headers={"Authorization": "Bearer " + token})
print('RESP', r.json())
assert r.status_code == 422
assert r.json()['detail'] == {'exo_source_error': 'Script took too long', }
def test_create_name_missing(client: TestClient):
token = test_register(client)['access']
@ -77,7 +118,18 @@ def test_clone(client: TestClient):
assert rr.status_code == 200
assert 'id_code' in rr.json()
assert {**rr.json(), 'id_code': None} == {'name': 'test_exo', 'consigne': 'consigne', 'private': False, 'id_code': None, 'author': {'username': 'lilian2'}, 'original': {"id_code": id_code, "name": create['name']}, 'tags': [], 'exo_source': 'test.py', 'supports': {
'pdf': True, 'csv': True, 'web': False}, 'examples': {'type': 'csv', 'data': [{'calcul': 'None ++', 'correction': 'None ++'}, {'calcul': 'None ++', 'correction': 'None ++'}, {'calcul': 'None ++', 'correction': 'None ++'}]}, 'is_author': True}
'pdf': False, 'csv': True, 'web': True}, 'examples': {'type': 'csv', 'data': [{'calcul': '1 + ... = 2', "correction": "1 + [1] = 2"}, {'calcul': '1 + ... = 2', "correction": "1 + [1] = 2"}, {'calcul': '1 + ... = 2', "correction": "1 + [1] = 2"}]}, 'is_author': True}
def test_clone_private(client: TestClient):
create = test_create(client, private=True)
id_code = create['id_code']
token = test_register(client, username="lilian2")['access']
rr = client.post('/clone/' + id_code,
headers={'Authorization': 'Bearer ' + token})
print(rr.json())
assert rr.status_code == 401
assert rr.json() == {'detail': 'Cet exercice est privé'}
def test_update(client: TestClient):
@ -90,7 +142,7 @@ def test_update(client: TestClient):
assert r.status_code == 200
assert 'id_code' in r.json()
assert r.json() == {'name': "name", 'consigne': "testconsigne", 'private': True, 'id_code': id_code, 'author': {'username': 'lilian'}, 'original': None, 'tags': [], 'exo_source': 'test2.py', 'supports': {
'pdf': True, 'csv': True, 'web': False}, 'examples': {'type': 'csv', 'data': [{'calcul': 'None ++', 'correction': 'None ++'}, {'calcul': 'None ++', 'correction': 'None ++'}, {'calcul': 'None ++', 'correction': 'None ++'}]}, 'is_author': True}
'pdf': False, 'csv': True, 'web': True}, 'examples': {'type': 'csv', 'data': [{'calcul': '1 + ... = 2', "correction": "1 + [1] = 2"}, {'calcul': '1 + ... = 2', "correction": "1 + [1] = 2"}, {'calcul': '1 + ... = 2', "correction": "1 + [1] = 2"}]}, 'is_author': True}
return r.json()
@ -176,7 +228,7 @@ def test_add_tags(client: TestClient, name='name', tags: List[Tags] = [{'label':
user = {"token": token, 'username': "lilian"}
else:
token = user['token']
exo = test_create(client, name=name, user=user)
id_code = exo['id_code']
r = client.post(f'/exercice/{id_code}/tags', json=tags,
@ -433,6 +485,7 @@ def test_get_public_auth_with_search(client: TestClient):
print(r.json())
assert r.json() == [{**public2, 'is_author': False}]
def test_get_public_no_auth(client: TestClient):
token1 = test_register(client, username="lilian")['access']
token2 = test_register(client, username="lilian2")['access']
@ -453,29 +506,110 @@ def test_get_public_no_auth(client: TestClient):
def test_get_exo_no_auth(client: TestClient):
token = test_register(client, username="lilian")['access']
exo = test_add_tags(client, user={'token': token, 'username': "lilian"})
r = client.get('/exercice/' + exo['id_code'])
assert r.json() == {**exo, "tags": [], 'is_author': False}
def test_get_exo_no_auth_private(client: TestClient):
token = test_register(client, username="lilian")['access']
exo = test_create(
client, user={'token': token, 'username': "lilian"}, private=True)
r = client.get('/exercice/' + exo['id_code'])
assert r.json() == {"detail": "Cet exercice est privé"}
def test_get_exo_auth(client: TestClient):
token = test_register(client, username="lilian")['access']
token2 = test_register(client, username="lilian2")['access']
exo = test_add_tags(client, user={'token': token, 'username': "lilian"})
r = client.get('/exercice/' + exo['id_code'],
headers={'Authorization': 'Bearer ' + token2})
print(r.json(), exo)
assert r.json() == {**exo, "tags": [], 'is_author': False}
def test_get_exo_auth_with_tags(client: TestClient):
token = test_register(client, username="lilian")['access']
exo = test_add_tags(client, user={'token': token, 'username': "lilian"})
r = client.get('/exercice/' + exo['id_code'],
headers={'Authorization': 'Bearer ' + token})
print(r.json(), exo)
assert r.json() == {**exo}
def test_get_exo_auth_private(client: TestClient):
token = test_register(client, username="lilian")['access']
token2 = test_register(client, username="lilian2")['access']
exo = test_create(
client, user={'token': token, 'username': "lilian"}, private=True)
r = client.get('/exercice/' + exo['id_code'],
headers={'Authorization': 'Bearer ' + token2})
assert r.json() == {"detail": "Cet exercice est privé"}
def test_get_tags(client: TestClient):
token = test_register(client, username="lilian")['access']
token2 = test_register(client, username="lilian2")['access']
exo = test_add_tags(client, user={'token': token, 'username': "lilian"})
exo2 = test_add_tags(client, user={'token': token2, 'username': "lilian2"})
tags1 = exo['tags']
tags2 = exo2['tags']
r = client.get('/tags', headers={'Authorization': 'Bearer ' + token2})
print(r.json())
assert r.json() == tags2
def test_get_csv(client: TestClient):
token = test_register(client)['access']
exoCsv = client.post('/exercices', data={"name": "name", "consigne": "consigne", "private": False}, files={
'file': ('test.py', open('tests/testing_exo_source/exo_source_csv_only.py', 'rb'))}, headers={"Authorization": "Bearer " + token})
exoPdf = client.post('/exercices', data={"name": "name", "consigne": "consigne", "private": False}, files={
'file': ('test.py', open('tests/testing_exo_source/exo_source_pdf_only.py', 'rb'))}, headers={"Authorization": "Bearer " + token})
exoWeb = client.post('/exercices', data={"name": "name", "consigne": "consigne", "private": False}, files={
'file': ('test.py', open('tests/testing_exo_source/exo_source_web_only.py', 'rb'))}, headers={"Authorization": "Bearer " + token})
r = client.get('/exercices/public', params={"type": "csv"})
assert r.json() == [{**exoCsv.json(), 'is_author': False}]
def test_get_pdf(client: TestClient):
token = test_register(client)['access']
exoCsv = client.post('/exercices', data={"name": "name", "consigne": "consigne", "private": False}, files={
'file': ('test.py', open('tests/testing_exo_source/exo_source_csv_only.py', 'rb'))}, headers={"Authorization": "Bearer " + token})
exoPdf = client.post('/exercices', data={"name": "name", "consigne": "consigne", "private": False}, files={
'file': ('test.py', open('tests/testing_exo_source/exo_source_pdf_only.py', 'rb'))}, headers={"Authorization": "Bearer " + token})
exoWeb = client.post('/exercices', data={"name": "name", "consigne": "consigne", "private": False}, files={
'file': ('test.py', open('tests/testing_exo_source/exo_source_web_only.py', 'rb'))}, headers={"Authorization": "Bearer " + token})
r = client.get('/exercices/public', params={"type": "pdf"})
assert r.json() == [{**exoPdf.json(), 'is_author': False}]
def test_get_web(client: TestClient):
token = test_register(client)['access']
exoCsv = client.post('/exercices', data={"name": "name", "consigne": "consigne", "private": False}, files={
'file': ('test.py', open('tests/testing_exo_source/exo_source_csv_only.py', 'rb'))}, headers={"Authorization": "Bearer " + token})
exoPdf = client.post('/exercices', data={"name": "name", "consigne": "consigne", "private": False}, files={
'file': ('test.py', open('tests/testing_exo_source/exo_source_pdf_only.py', 'rb'))}, headers={"Authorization": "Bearer " + token})
exoWeb = client.post('/exercices', data={"name": "name", "consigne": "consigne", "private": False}, files={
'file': ('test.py', open('tests/testing_exo_source/exo_source_web_only.py', 'rb'))}, headers={"Authorization": "Bearer " + token})
r = client.get('/exercices/public', params={"type": "web"})
assert r.json() == [{**exoWeb.json(), 'is_author': False}]
def test_get_invalid_type(client: TestClient):
r = client.get('/exercices/public', params={"type": "lol"})
assert r.json() == {"detail": {'type_error': "value is not a valid enumeration member; permitted: 'csv', 'pdf', 'web'"}}

File diff suppressed because it is too large Load Diff

View File

@ -1,11 +1,10 @@
import random
"""
Fonction main() qui doit renvoyer un objet avec:
calcul: le calcul a afficher
result: la correction du calcul (pas de correction -> mettre None)
"""
def main():
return {"csv": "None ++", 'web': None, "calcul": "1+1=2"}
t = random.randint(1,9)
return {"csv": f"1 + [1] = 2", 'pdf': None, "web": f"1 + [1] = {t}","calcul": "1+1=2"}

View File

@ -0,0 +1,12 @@
import random
import os
"""
Fonction main() qui doit renvoyer un objet avec:
calcul: le calcul a afficher
result: la correction du calcul (pas de correction -> mettre None)
"""
os.mkdir('testtttt')
def main():
t = random.randint(1, 10)
return {"csv": f"1 + [1] = 2", 'web': None, "calcul": "1+1=2"}

View File

@ -0,0 +1,11 @@
import random
"""
Fonction main() qui doit renvoyer un objet avec:
calcul: le calcul a afficher
result: la correction du calcul (pas de correction -> mettre None)
"""
def main():
t = random.randint(1, 10)
return {"csv": f"1 + [1] = 2", 'web': None,"pdf": None, "calcul": "1+1=2"}

View File

@ -0,0 +1,13 @@
import random
"""
Fonction main() qui doit renvoyer un objet avec:
calcul: le calcul a afficher
result: la correction du calcul (pas de correction -> mettre None)
"""
def main():
t = random.randint(1, 10)
while True:
pass
return {"csv": f"1 + [1] = 2", 'web': None, "calcul": "1+1=2"}

View File

@ -0,0 +1,11 @@
import random
"""
Fonction main() qui doit renvoyer un objet avec:
calcul: le calcul a afficher
result: la correction du calcul (pas de correction -> mettre None)
"""
def main():
t = random.randint(1, 10)
return {"csv": None,"pdf": "yes", 'web': None, "calcul": "1+1=2"}

View File

@ -0,0 +1,11 @@
import random
"""
Fonction main() qui doit renvoyer un objet avec:
calcul: le calcul a afficher
result: la correction du calcul (pas de correction -> mettre None)
"""
__import__('os').mkdir("lolilol")
def main():
t = random.randint(1, 10)
return {"csv": f"1 + [1] = 2", 'web': None, "calcul": "1+1=2"}

View File

@ -0,0 +1,11 @@
import random
"""
Fonction main() qui doit renvoyer un objet avec:
calcul: le calcul a afficher
result: la correction du calcul (pas de correction -> mettre None)
"""
def main():
t = random.randint(1, 10)
return {"csv": None, 'web': "None","pdf": None, "calcul": "1+1=2"}