Lilian937342 ebfb3ac492 websocket
2022-07-28 11:07:03 +02:00

579 lines
25 KiB
Python

from django.db.models import Q
from rest_framework.pagination import PageNumberPagination
from django.contrib.auth.decorators import login_required
from django.utils.decorators import method_decorator
from django.core.files.base import ContentFile
from rest_framework.decorators import api_view, permission_classes
import base64
from cmath import inf
import csv
import importlib.util
import io
import os
import re
import subprocess
import tempfile
from django.http import FileResponse, HttpResponse
from django.shortcuts import render
from time import sleep
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.template.loader import get_template
import sympy
from .filters import ExosFilter
from .paginations import CustomPagination
from .pdfmaker import pdf_settings
from .models import generate_unique_code_step
from .utils import TexError, checkExoModelObject
import requests
from api.Generateur import Csv_generator, Generateur
from .models import Exercice, Tag
from .serializers import ExerciceCreateSerializer, ExerciceSerializer, TagSerializer
from users.serializers import UserSerializer
from rest_framework import permissions
# Create your views here.
class ExerciceAPI(APIView):
pagination_class = CustomPagination
def get(self, request, format=None):
steps = Exercice.objects.filter(private=False)
userExos = []
if request.user.is_authenticated:
steps = [s for s in steps if s.author.id != request.user.id]
userExos = request.user.exercice_set.all()
code = request.GET.get('id')
if code == 'pdf':
stepsListSorted = [s for s in steps if s.isPdf == True]
userExosListSorted = [s for s in userExos if s.isPdf == True]
elif code == 'web':
stepsListSorted = [s for s in steps if s.isWeb == True]
userExosListSorted = [s for s in userExos if s.isWeb == True]
if code == 'all':
stepsListSorted = steps
userExosListSorted = userExos
else:
stepsListSorted = steps
userExosListSorted = userExos
if code != 'all' and code != 'pdf' and code != "web":
try:
exo = Exercice.objects.get(id_code=code)
except:
return Response({'error': "Not found"}, status= status.HTTP_404_NOT_FOUND)
isUser = False
if request.user == exo.author:
isUser = True
return Response({"data": {**ExerciceSerializer(exo).data, "isUser": isUser, "original": exo.origin.id_code if exo.origin != None else None, "author": UserSerializer(exo.author).data if exo.author != None else None, 'exo_model': {'filename': exo.exo_model.name.split('/')[-1], "data": open(exo.exo_model.name, 'r').read()}, "tags": [{**TagSerializer(t).data, 'value': t.id_code, 'label': t.name} for t in [tt for tt in exo.tags.all() if tt.user == request.user]]}}, status=status.HTTP_200_OK)
else:
return Response({"data": {
"userExos": [{**ExerciceSerializer(ex).data, 'isUser': True, "original": ex.origin.id_code if ex.origin != None else None, "author": UserSerializer(ex.author).data if ex.author != None else None, "exo_model": {"filename": ex.exo_model.name.split('/')[-1], "data": open(
ex.exo_model.name, 'r').read()}, "tags": [{**TagSerializer(t).data, 'value': t.id_code, 'label': t.name} for t in ex.tags.all()]} for ex in userExosListSorted],
"publicList": [{**ExerciceSerializer(ex).data, 'isUser': False, 'isRegistered': len(request.user.registeredExos.filter(id_code=ex.id_code)) != 0 if request.user.is_authenticated else None, "author": UserSerializer(ex.author).data if ex.author != None else None, "exo_model": {"filename": ex.exo_model.name.split('/')[-1], "data": open(
ex.exo_model.name, 'r').read()}, "original": ex.origin.id_code if ex.origin != None else None,
"tags": [{**TagSerializer(t).data, 'value': t.id_code, 'label': t.name} for t in [tt for tt in ex.tags.all() if tt.user == request.user]]} for ex in stepsListSorted],
}}, status=status.HTTP_200_OK)
def post(self, request, format=None):
file = request.FILES['file']
name = request.data.get('name')
private = request.data.get('private')
consigne = request.data.get('consigne')
create_serializer = ExerciceCreateSerializer(
data={'exo_model': file, "consigne": consigne, "name": name, 'private': private}, context={'request': request})
if create_serializer.is_valid():
author = request.user if request.user.is_authenticated else None
new_exo = Exercice(consigne=consigne,
exo_model=create_serializer.validated_data['exo_model']['file'], name=name, author=author, private=create_serializer.validated_data['private'])
# Generateur(exo_model, 5)
new_exo.isPdf = create_serializer.validated_data['exo_model']['isPdf']
new_exo.isCsv = create_serializer.validated_data['exo_model']['isCsv']
new_exo.isWeb = create_serializer.validated_data['exo_model']['isWeb']
new_exo.save()
new_exo.exemple = {
# f'{"Csv" if new_exo.isCsv == True else ""}{"Web" if new_exo.isWeb == True else ""}',
'type': 'Csv' if new_exo.isCsv == True else 'Web' if new_exo.isWeb == True else None,
'data': Generateur(new_exo.exo_model.name, 5, 'csv' if new_exo.isCsv == True else 'web' if new_exo.isWeb == True else None, True) if new_exo.isCsv == True or new_exo.isWeb == True else None
}
new_exo.save()
# sleep(2)
return Response({"status": "200", "errors": {}, "data": {**ExerciceSerializer(new_exo).data, "author": UserSerializer(new_exo.author).data if new_exo.author != None else None, 'exo_model': {'filename': new_exo.exo_model.name.split('/')[-1], "data": open(new_exo.exo_model.name, 'r').read()}, "tags": [{**TagSerializer(t).data, 'value': t.id_code, 'label': t.name} for t in new_exo.tags.all()]}}, status=status.HTTP_200_OK)
return Response({"status": "400", "errors": create_serializer.errors}, status=status.HTTP_400_BAD_REQUEST)
@method_decorator(login_required)
def put(self, request, format=None):
file = request.FILES['file']
name = request.data.get('name')
consigne = request.data.get('consigne')
id_code = request.data.get('id_code')
private = request.data.get('private')
exo = Exercice.objects.filter(id_code=id_code)
if len(exo) == 0:
return Response({'status': "404", "data": {}},
status=status.HTTP_404_NOT_FOUND)
exo = exo[0]
if request.user != exo.author:
return Response({'status': "401", "data": {}},
status=status.HTTP_401_UNAUTHORIZED)
serializer = ExerciceCreateSerializer(
data={'exo_model': file, "consigne": consigne, "name": name, "private": private})
errors = []
if serializer.is_valid():
exo.name = name
exo.consigne = consigne
exo.exo_model = serializer.validated_data['exo_model']['file']
exo.private = serializer.validated_data['private']
exo.isPdf = serializer.validated_data['exo_model']['isPdf']
exo.isCsv = serializer.validated_data['exo_model']['isCsv']
exo.isWeb = serializer.validated_data['exo_model']['isWeb']
exo.save()
exo.exemple = {
# f'{"Csv" if new_exo.isCsv == True else ""}{"Web" if new_exo.isWeb == True else ""}',
'type': 'Csv' if exo.isCsv == True else 'Web' if exo.isWeb == True else None,
'data': Generateur(exo.exo_model.name, 5, 'csv' if exo.isCsv == True else 'web' if exo.isWeb == True else None, True) if exo.isCsv == True or exo.isWeb == True else None
}
exo.save()
return Response({"status": "200", "errors": {}, "data": ExerciceSerializer(exo).data}, status=status.HTTP_200_OK)
return Response({"status": "400", "errors": serializer.errors}, status=status.HTTP_400_BAD_REQUEST)
@method_decorator(login_required)
def delete(self, request, format=None):
id_code = request.data.get('id_code')
exo = Exercice.objects.filter(id_code=id_code)
if len(exo) == 0:
return Response({'status': "404", "data": {}},
status=status.HTTP_404_NOT_FOUND)
exo = exo[0]
if request.user != exo.author:
return Response({'status': "401", "data": {}},
status=status.HTTP_401_UNAUTHORIZED)
exo.delete()
return Response({'status': "200", "data": id_code}, status=status.HTTP_200_OK)
@api_view(['GET'])
def getPublicList(request):
''' paginator = PageNumberPagination()
paginator.page_size = 10
person_objects = Exercice.objects.all()
result_page = paginator.paginate_queryset(person_objects, request) '''
#exos = Exercice.objects.all()
# return Response({'data': ExerciceSerializer(exos[8], context={'user_id': request.user.id_code if not request.user.is_anonymous else ''}).data}, status=status.HTTP_200_OK)
paginator = CustomPagination()
paginator.page_size = 22
exos = Exercice.objects.filter(private=False).filter(original=True)
if not request.user.is_anonymous:
# [ex for ex in exos if ex.author.id != request.user.id]
exos = exos.filter(~Q(author__id=request.user.id))
code = request.query_params.get('code')
if code == 'pdf':
exos = exos.filter(isPdf=True) # [s for s in exos if s.isPdf == True]
elif code == 'web':
exos = exos.filter(isWeb=True) # [s for s in exos if s.isWeb == True]
elif code == 'csv':
exos = exos.filter(isCsv=True) # [s for s in exos if s.isCsv == True]
exos = ExosFilter(request=request.GET, queryset=exos).qs
result_page = paginator.paginate_queryset(
exos.order_by('-last_update'), request)
serializer = ExerciceSerializer(result_page, many=True, context={
'user_id': request.user.id_code if not request.user.is_anonymous else ''})
return paginator.get_paginated_response(serializer.data)
@api_view(['GET'])
@permission_classes([permissions.IsAuthenticated])
def getUserExosList(request):
paginator = CustomPagination()
paginator.page_size = 22
# Exercice.objects.filter(private=False).filter(original=True)
exos = request.user.exercice_set.all()
code = request.query_params.get('code')
if code == 'pdf':
exos = exos.filter(isPdf=True) # [s for s in exos if s.isPdf == True]
elif code == 'web':
exos = exos.filter(isWeb=True) # [s for s in exos if s.isWeb == True]
elif code == 'csv':
exos = exos.filter(isCsv=True) # [s for s in exos if s.isCsv == True]
exos = ExosFilter(request=request.GET, queryset=exos).qs
result_page = paginator.paginate_queryset(
exos.order_by('-last_update'), request)
serializer = ExerciceSerializer(result_page, many=True, context={
'user_id': request.user.id_code if not request.user.is_anonymous else ''})
return paginator.get_paginated_response(serializer.data)
@api_view(['GET'])
def getExoModelFile(request):
id_code = request.query_params['id_code']
exo = Exercice.objects.filter(id_code=id_code)
if len(exo) == 0:
return Response({'errors': 'Not found'}, status=status.HTTP_404_NOT_FOUND)
exo = exo[0]
model = exo.exo_model
with open(model.name, 'r') as f:
response = HttpResponse(f.read(), content_type='text/x-python')
response['Content-Disposition'] = f'attachment;filename={model.name.split("/")[-1]}'
return response
@api_view(['POST'])
@permission_classes([permissions.IsAuthenticated])
def fav(request):
code = request.data.get('code')
exo = Exercice.objects.filter(id_code=code)
if len(exo) == 0:
return Response({'data': {'msg': 'Not found'}}, status=status.HTTP_404_NOT_FOUND)
originExo = exo[0]
exo = exo[0]
with open(exo.exo_model.path, 'r') as f:
exo.pk = None
exo.id = None
exo.id_code = generate_unique_code_step()
exo.author = request.user
exo.original = False
exo.origin = Exercice.objects.filter(id_code=code)[0]
exo.exo_model.save(f.name.split('/')[-1], ContentFile(f.read()))
exo._state.adding = True
exo.save()
return Response({'data': {'isRegistered': False}}, status=status.HTTP_200_OK)
class TagsAPI(APIView):
@method_decorator(login_required)
def get(self, request, format=None):
tags = request.user.tag_set.all()
return Response({"data": list(map(lambda tag: {**TagSerializer(tag).data, 'label': tag.name, "value": tag.id_code}, tags))}, status=status.HTTP_200_OK)
@method_decorator(login_required)
def post(self, request, format=None):
options = request.data.get('tags')
id_code = request.data.get('step')
exo = Exercice.objects.filter(id_code=id_code)
if len(exo) == 0:
return Response({'status': "404", "data": {}},
status=status.HTTP_404_NOT_FOUND)
exo = exo[0]
tagsList = []
for o in options:
#
if o['value'].startswith('new_opt'):
newTag = Tag(name=o['label'],
color=o['color'], user=request.user)
newTag.save()
exo.tags.add(newTag)
exo.save()
tagsList.append(
{'name': o['label'], 'color': o['color'], 'id_code': newTag.id_code})
else:
tagId = o['value']
tag = request.user.tag_set.filter(id_code=tagId)
if len(tag) == 0:
return Response({'errors': ''}, status=status.HTTP_400_BAD_REQUEST)
tag = tag[0]
exo.tags.add(tag)
exo.save()
tagsList.append(
{'name': o['label'], 'color': o['color'], 'id_code': tag.id_code})
return Response({'id_code': exo.id_code, 'tags': [{**TagSerializer(t).data, 'value': t.id_code, 'label': t.name} for t in exo.tags.all() if t.user == request.user]}, status=status.HTTP_200_OK)
@method_decorator(login_required)
def delete(self, request, format=None):
tagId = request.data.get('tag')
id_code = request.data.get('step')
exo = Exercice.objects.filter(id_code=id_code)
if len(exo) == 0:
return Response({'status': "404", "data": {}},
status=status.HTTP_404_NOT_FOUND)
tag = request.user.tag_set.filter(id_code=tagId)
if len(tag) == 0:
return Response({'errors': ''}, status=status.HTTP_400_BAD_REQUEST)
tag = tag[0]
exo = exo[0]
if request.user != exo.author:
if tag.user != request.user:
return Response({'status': "401", "data": {}},
status=status.HTTP_401_UNAUTHORIZED)
exo.tags.remove(tag)
exo.save()
return Response({'id_code': exo.id_code, 'name': tag.name, 'tagId': tag.id_code, 'tags': [{**TagSerializer(t).data, 'value': t.id_code, 'label': t.name} for t in exo.tags.all() if t.user == request.user]}, status=status.HTTP_200_OK)
class Editor(APIView):
def post(self, request, format=None):
code = request.data.get('code')
if code == None:
return Response([''], status=status.HTTP_200_OK)
with tempfile.TemporaryDirectory() as tempdir:
with open(os.path.join(tempdir, "tmp.py"), 'w') as script:
script.write(code)
proc = subprocess.Popen(['python3.10', 'tmp.py', ],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=tempdir)
out, err = proc.communicate()
out = out.decode().splitlines()
return Response({"out": out, "error": err.decode('utf-8').splitlines()}, status=status.HTTP_200_OK)
class CSV_Generator(APIView):
def get(self, request, code, format=None):
ex = Exercice.objects.filter(id_code=code)[0]
if ex.isCsv == False:
return Response({'error': "Bah non en fait"}, status=status.HTTP_401_UNAUTHORIZED)
model = ex.exo_model.name
consigne = ex.consigne
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment;filename="test.csv"'
writer = csv.writer(response, delimiter=',',
quotechar=',', quoting=csv.QUOTE_MINIMAL, dialect='excel') # mettre | comme sep un jour
Csv_generator(model, 10, 10, 10,
consigne, writer)
return response
@api_view(['POST'])
def pdfGen(request):
exos = [pdf_settings(Exercice.objects.get(id_code=ex['id_code']).exo_model.name, int(ex['numberInExo']), Exercice.objects.get(id_code=ex['id_code']).consigne)
for ex in request.data.get('exos')]
for e in range(len(exos)):
for exo in range(len(exos[e]['exos'])):
#
exos[e]['exos'][exo]['calcul'] = sympy.latex(
exos[e]['exos'][exo]['calcul'])
exos[e]['exos'][exo]['calcul'] = exos[e]['exos'][exo]['calcul'].replace(
'', '\\ensuremath{\\simeq}')
exos[e]['exos'][exo]['calcul'] = exos[e]['exos'][exo]['calcul'].replace(
'', '\\ensurmath{\approx}')
exos[e]['exos'][exo]['calcul'] = exos[e]['exos'][exo]['calcul'].replace(
'', '\\ensuremath{\neq}')
exos[e]['exos'][exo]['correction'] = sympy.latex(
exos[e]['exos'][exo]['correction'])
exos[e]['exos'][exo]['correction'] = exos[e]['exos'][exo]['correction'].replace(
'', '\\ensuremath{\\simeq}')
exos[e]['exos'][exo]['correction'] = exos[e]['exos'][exo]['correction'].replace(
'', '\\ensurmath{\approx}')
exos[e]['exos'][exo]['correction'] = exos[e]['exos'][exo]['correction'].replace(
'', '\\ensuremath{\neq}')
exp_list = re.findall(
r'\[([A-Za-z0-9_]+)\]', exos[e]['exos'][exo]['correction'])
red = '{red}'
for exp in exp_list:
exos[e]['exos'][exo]['correction'] = exos[e]['exos'][exo]['correction'].replace(
f'[{exp}]', f'\\textcolor{red}{{{exp}}}')
context = {'exos': exos, 'title': request.data.get(
'title'), 'police': request.data.get('police')}
template = get_template('test.tex')
latex = template.render(context)
with tempfile.TemporaryDirectory() as tempdir:
direct = tempdir
with open(os.path.join(direct, 'tmp.tex'), 'x', encoding='utf-8') as f:
f.write(latex)
command = f'xelatex -interaction=batchmode tmp.tex'
try:
subprocess.run(command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
cwd=direct)
except TexError:
raise TexError(log="./logs.txt", source=latex,
template_name='test.tex')
try:
subprocess.run(command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
cwd=direct)
except TexError:
raise TexError(log='./logs.txt', source=latex,
template_name='test.tex')
with open(os.path.join(direct, 'tmp.pdf'), 'rb') as out:
pdf = out.read()
buffer = io.BytesIO(out.read())
fich_name = request.data.get('file')
if fich_name.count('.') >= 1:
ext = fich_name.split('.')[-1]
#
if ext == 'pdf':
fich_name = fich_name
elif ext != 'pdf':
#
fich_name = fich_name.replace(f'.{ext}', '.pdf')
else:
fich_name = fich_name + '.pdf'
return Response({'pdf': base64.b64encode(pdf), 'filename': fich_name}, status=status.HTTP_200_OK)
class PDF(APIView):
def post(self, request, format=None):
exos = list(map(lambda ex: pdf_settings(
Exercice.objects.filter(id_code=ex['id_code'])[0].exo_model.name, 14, int(ex['numberInExo']), int(ex['numberOfExo']), 1, ex['consigne']), request.data.get('exos')))
#
# dans str mettre space(0.5) replace by \hspace{0.5cm}
template_name = 'test.tex'
symbols = {
"": "\\approx",
'': '\\neq',
"": "\\simeq"
}
# a refaire pour supp les symbols
for e in range(len(exos)):
for exo in range(len(exos[e]['exos'])):
#
exos[e]['exos'][exo]['calcul'] = sympy.latex(
exos[e]['exos'][exo]['calcul'])
exos[e]['exos'][exo]['calcul'] = exos[e]['exos'][exo]['calcul'].replace(
'', '\\ensuremath{\\simeq}')
exos[e]['exos'][exo]['calcul'] = exos[e]['exos'][exo]['calcul'].replace(
'', '\\ensurmath{\approx}')
exos[e]['exos'][exo]['calcul'] = exos[e]['exos'][exo]['calcul'].replace(
'', '\\ensuremath{\neq}')
exos[e]['exos'][exo]['correction'] = sympy.latex(
exos[e]['exos'][exo]['correction'])
exos[e]['exos'][exo]['correction'] = exos[e]['exos'][exo]['correction'].replace(
'', '\\ensuremath{\\simeq}')
exos[e]['exos'][exo]['correction'] = exos[e]['exos'][exo]['correction'].replace(
'', '\\ensurmath{\approx}')
exos[e]['exos'][exo]['correction'] = exos[e]['exos'][exo]['correction'].replace(
'', '\\ensuremath{\neq}')
exp_list = re.findall(
r'\[([A-Za-z0-9_]+)\]', exos[e]['exos'][exo]['correction'])
red = '{red}'
for exp in exp_list:
exos[e]['exos'][exo]['correction'] = exos[e]['exos'][exo]['correction'].replace(
f'[{exp}]', f'\\textcolor{red}{{{exp}}}')
context = {'exos': exos, 'title': request.data.get(
'title'), 'police': request.data.get('police')}
template = get_template('test.tex')
latex = template.render(context) # latex to compile
#
with tempfile.TemporaryDirectory() as tempdir:
direct = tempdir
with open(os.path.join(direct, 'tmp.tex'), 'x', encoding='utf-8') as f:
f.write(latex)
command = f'xelatex -interaction=batchmode tmp.tex'
try:
subprocess.run(command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
cwd=direct)
except TexError:
raise TexError(log=log, source=latex,
template_name='test.tex')
try:
subprocess.run(command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
cwd=direct)
except TexError:
raise TexError(log=log, source=latex,
template_name='test.tex')
with open(os.path.join(direct, 'tmp.pdf'), 'rb') as out:
pdf = out.read()
buffer = io.BytesIO(out.read())
fich_name = request.data.get('file')
if fich_name.count('.') >= 1:
ext = fich_name.split('.')[-1]
#
if ext == 'pdf':
fich_name = fich_name
elif ext != 'pdf':
#
fich_name = fich_name.replace(f'.{ext}', '.pdf')
else:
fich_name = fich_name + '.pdf'
return Response({'pdf': base64.b64encode(pdf), 'filename': fich_name}, status=status.HTTP_200_OK)
''' return PDFResponse(base64.b64encode(pdf), filename=fich_name)
subprocess.run('ls', cwd=direct)
'''
class Test(APIView):
def get(self, request, format=None):
code = "VZRKLZ"
ex = Exercice.objects.filter(id_code=code)[0]
model = ex.exo_model
oplist = Generateur(model.name, 2, 'calcul')
return Response({"data": oplist}, status=status.HTTP_200_OK)
class ExoModelApi(APIView):
def get(self, request, format=None):
code = request.GET.get('id_code')
exo = Exercice.objects.filter(id_code=code)[0]
model = open(exo.exo_model.name, 'rb')
return FileResponse(model, status=status.HTTP_200_OK)