261 lines
9.6 KiB
Python
261 lines
9.6 KiB
Python
from datetime import datetime
|
|
from email.policy import default
|
|
import functools
|
|
from pyexpat import model
|
|
import random
|
|
import string
|
|
from django.db import models
|
|
import pytz
|
|
from .utils import getNow
|
|
|
|
from users.models import CustomUser
|
|
# Create your models here.
|
|
|
|
|
|
def generate_unique_code_room():
|
|
length = 6
|
|
|
|
while True:
|
|
code = ''.join(random.choices(string.ascii_uppercase, k=length))
|
|
if Room.objects.filter(id_code=code).count() == 0:
|
|
break
|
|
|
|
return code
|
|
|
|
|
|
def generate_unique_code_parcours():
|
|
length = 6
|
|
|
|
while True:
|
|
code = ''.join(random.choices(string.ascii_uppercase, k=length))
|
|
if Parcours.objects.filter(id_code=code).count() == 0:
|
|
break
|
|
return code
|
|
|
|
|
|
def generate_unique_code_corr():
|
|
length = 6
|
|
|
|
while True:
|
|
code = ''.join(random.choices(string.ascii_uppercase, k=length))
|
|
if TempCorrection.objects.filter(id_code=code).count() == 0:
|
|
break
|
|
return code
|
|
|
|
|
|
class RoomManager(models.Manager):
|
|
def add_participant(self, room, new_name, clientId, owner, online):
|
|
room = self.get_queryset().filter(id_code=room)[0]
|
|
participants = room.anonymousMembers
|
|
if new_name in list(map(lambda p: p['nick'], participants)):
|
|
return None
|
|
code = None
|
|
while True:
|
|
code = ''.join(random.choices(string.ascii_uppercase, k=6))
|
|
if code not in list(map(lambda p: p['code'], participants)):
|
|
break
|
|
new_participant = {'nick': new_name, 'code': code,
|
|
'clientId': clientId, "owner": owner}
|
|
room.online = [*room.online, code]
|
|
room.anonymousMembers = [*participants, new_participant]
|
|
room.save()
|
|
return new_participant
|
|
|
|
def add_user(self, room, code, owner, online):
|
|
room = self.get_queryset().filter(id_code=room)[0]
|
|
try:
|
|
user = CustomUser.objects.filter(id_code = code)[0]
|
|
except IndexError:
|
|
return None
|
|
|
|
room.userMembers.add(user)
|
|
new_participant = {'nick': user.username, 'code': user.id_code,
|
|
'clientId': user.clientId, "owner": owner, "online": online}
|
|
room.online = [*room.online, user.id_code]
|
|
room.save()
|
|
return new_participant
|
|
|
|
def del_user(self, room, code):
|
|
room = self.get_queryset().filter(id_code=room)[0]
|
|
try:
|
|
user = CustomUser.objects.filter(id_code=code)[0]
|
|
except IndexError:
|
|
return None
|
|
room.userMembers.remove(user)
|
|
parcours = room.parcours_set.all()
|
|
for parc in parcours:
|
|
challenger = parc.challenger
|
|
parc.challenger = list(
|
|
filter(lambda c: c['code'] != code, challenger))
|
|
parc.save()
|
|
room.save()
|
|
return code
|
|
|
|
def del_participant(self, room, code):
|
|
room = self.get_queryset().filter(id_code=room)[0]
|
|
participants = room.anonymousMembers
|
|
|
|
room.anonymousMembers = list(
|
|
filter(lambda c: c['code'] != code, participants))
|
|
print('parcours', room.parcours_set)
|
|
parcours = room.parcours_set.all()
|
|
for parc in parcours:
|
|
challenger = parc.challenger
|
|
parc.challenger = list(
|
|
filter(lambda c: c['code'] != code, challenger))
|
|
parc.save()
|
|
room.save()
|
|
return code
|
|
|
|
def add_waiter(self, room, new_name, isUser, code = None):
|
|
room = self.get_queryset().filter(id_code=room)[0]
|
|
|
|
participants = room.anonymousMembers
|
|
if new_name in list(map(lambda p: p['nick'], participants)) and isUser:
|
|
return None
|
|
|
|
waiters = room.waiters
|
|
|
|
if new_name in list(map(lambda p: p['nick'], waiters)):
|
|
return None
|
|
|
|
if code == None:
|
|
while True:
|
|
code = ''.join(random.choices(string.ascii_uppercase, k=6))
|
|
if code not in list(map(lambda p: p['code'], waiters)):
|
|
break
|
|
new_waiter = {'nick': new_name, 'code': code, "status": 'user' if isUser == True else 'anonymous'}
|
|
room.waiters = [*waiters, new_waiter]
|
|
room.save()
|
|
return new_waiter
|
|
|
|
def del_waiter(self, room, code):
|
|
room = self.get_queryset().filter(id_code=room)[0]
|
|
waiters = room.waiters
|
|
room.waiters = list(filter(lambda w: w['code'] != code, waiters))
|
|
room.save()
|
|
return True
|
|
|
|
def disconnect(self, room_code, code):
|
|
try:
|
|
room = self.get_queryset().filter(id_code=room_code)[0]
|
|
online = room.online
|
|
|
|
room.online = [
|
|
o for o in online if o != code]
|
|
room.save()
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
|
|
def connect(self, room_code, code):
|
|
room = self.get_queryset().filter(id_code=room_code)[0]
|
|
online = room.online
|
|
|
|
room.online = [*online, code]
|
|
room.save()
|
|
return True
|
|
|
|
def connectUser(self, room_code, code):
|
|
room = self.get_queryset().filter(id_code=room_code)[0]
|
|
online = room.online
|
|
|
|
room.online = [*online, code]
|
|
room.save()
|
|
return True
|
|
|
|
|
|
class Room(models.Model):
|
|
name = models.CharField(max_length=30)
|
|
id_code = models.CharField(
|
|
max_length=50, default=generate_unique_code_room, unique = True) #Pour la migration initiale : si pas en "" renvoie erreur car table utilisée dans fonction avant d'être crée
|
|
anonymousMembers = models.JSONField(default=list, null=True)
|
|
userMembers = models.ManyToManyField("users.CustomUser")
|
|
waiters = models.JSONField(default=list)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
owner = models.JSONField(default=dict)
|
|
public_result = models.BooleanField(default=False)
|
|
|
|
online = models.JSONField(default=list)
|
|
private = models.BooleanField(default=True)
|
|
|
|
objects = RoomManager()
|
|
|
|
|
|
class ParcoursManager(models.Manager):
|
|
def challenge(self, parcours_code, user_code, result, isUser):
|
|
parcours = self.get_queryset().filter(id_code=parcours_code)[0]
|
|
|
|
challengers = list(
|
|
filter(lambda c: c['code'] == user_code, parcours.challenger))
|
|
|
|
if not isUser:
|
|
challenger = list(
|
|
filter(lambda p: p['code'] == user_code, parcours.room.anonymousMembers))[0]
|
|
else:
|
|
user = CustomUser.objects.filter(id_code = user_code)[0]
|
|
challenger = {'nick': user.username, "code": user.id_code, 'clientId': user.clientId}
|
|
validate = None
|
|
|
|
now = datetime.now()
|
|
date = str(now.astimezone(pytz.timezone('Europe/Berlin')))
|
|
if len(challengers) == 0:
|
|
condition = parcours.success_condition
|
|
validate = result['note']['value'] * 20 / \
|
|
result['note']['total'] >= condition
|
|
code = ''.join(random.choices(string.ascii_uppercase, k=6))
|
|
parcours.challenger = [*parcours.challenger,
|
|
{**challenger, 'exos': [{**result, 'endAt': date, 'code': code}], "validate": validate}]
|
|
|
|
else:
|
|
condition = parcours.success_condition
|
|
moyenne = self.getAverage(parcours.id_code, challenger['code'])
|
|
validate = moyenne['value'] * 20 / \
|
|
moyenne['total'] >= condition
|
|
oldChallenger = challengers[0]['exos']
|
|
while True:
|
|
code = ''.join(random.choices(string.ascii_uppercase, k=6))
|
|
if len(list(filter(lambda c: c['code'] == code, oldChallenger))) == 0:
|
|
break
|
|
exos = challengers[0]['exos']
|
|
if len(exos) > 6:
|
|
exos = [{**e, 'result':[] if i < (len(exos) - 6) and e['note']['isTrust'] == True else e['result']} for i, e in enumerate(exos)]
|
|
parcours.challenger = [*list(filter(lambda c: c['code'] != challenger['code'], parcours.challenger)),
|
|
{**challenger, 'exos': [*exos, {**result, 'endAt': date, "code": code}], "validate": validate}]
|
|
|
|
parcours.save()
|
|
return {'validate': validate, 'code': code}
|
|
|
|
def getAverage(self, parcours_code, user_code):
|
|
parcours = self.get_queryset().filter(id_code=parcours_code)[0]
|
|
challengers = parcours.challenger
|
|
challengers = [c for c in challengers if c['code'] == user_code]
|
|
if len(challengers) == 0:
|
|
|
|
return None
|
|
c = challengers[0]
|
|
return {'value': round(functools.reduce(lambda a, b: a+b, [e['note']['value'] for e in c['exos']])/len(c['exos']), 2), 'total': c['exos'][0]['note']['total']}
|
|
|
|
|
|
class Parcours(models.Model):
|
|
name = models.CharField(max_length=30)
|
|
id_code = models.CharField(
|
|
max_length=50, default=generate_unique_code_parcours, unique = True) # Pour la migration initiale : si pas en "" renvoie erreur car table utilisée dans fonction avant d'être crée
|
|
challenger = models.JSONField(default=list)
|
|
room = models.ForeignKey(Room, on_delete=models.CASCADE)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
timer = models.IntegerField(blank=True)
|
|
exercices = models.JSONField(default=list)
|
|
# https://docs.djangoproject.com/fr/4.0/ref/validators/
|
|
success_condition = models.IntegerField(default=10)
|
|
|
|
objects = ParcoursManager()
|
|
|
|
|
|
class TempCorrection(models.Model):
|
|
correction = models.JSONField(default=list)
|
|
id_code = models.CharField(
|
|
max_length=50, default=generate_unique_code_corr, unique = True) # Pour la migration initiale : si pas en "" renvoie erreur car table utilisée dans fonction avant d'être crée
|