generateur_v3/backend/api/services/exoValidation.py
2023-02-28 15:01:08 +01:00

162 lines
4.6 KiB
Python

import ast
import math
import os
import random
import re
import string
from multiprocessing import Manager
import sympy
from fastapi import UploadFile, status
from fastapi.exceptions import HTTPException
from tortoise.exceptions import ValidationError
from services.io import is_binary_file
from services.timeout import Process
VALIDATED_MODULES = ["random", "string", "sympy", "math"]
def checkExoSupportCompatibility(obj):
isPdf = False if (obj['pdf'] == None or (
obj['calcul'] == False and obj['pdf'] == False)) else True
isCsv = False if (obj['csv'] == None or (
obj['calcul'] == False and obj['csv'] == False)) else True
isWeb = False if (obj['web'] == None or (
obj['calcul'] == False and obj['web'] == False)) else True
return {
'pdf': isPdf, 'csv': isCsv, 'web': isWeb}
def get_module_from_string(value: str, *args, **kwargs):
locs = {}
try:
exec(value, {"random": random, "string": string, "sympy": sympy, "math": math}, locs)
except Exception as err:
raise ValueError(err.args[0])
return locs
def execute_main_if_avalaible(spec, *args, **kwargs):
try:
return spec["main"]()
except KeyError as atrerror:
raise ValueError(f"Fonction 'main' introuvable")
except Exception as e:
raise ValueError(f'[Error] : {e}')
def get_spec_with_timeout(data, time):
return get_module_from_string(data)
''' with timeout(time, ValidationError('[Error] : Script took too long')):
return get_module_from_string(data) '''
def fill_empty_values(object):
default_object = {"calcul": False, 'pdf': False, 'csv': False,
'web': False, 'correction': False}
return {**default_object, **object}
def get_results(data, resultObj):
locs = get_spec_with_timeout(data, 5)
result = execute_main_if_avalaible(locs)
result = fill_empty_values(result)
resultObj['result'] = result
def get_result_with_timeout(data):
m = Manager()
result = m.dict()
p = Process(target=get_results, args=(data, result))
p.start()
p.join(timeout=3)
p.terminate()
if p.exception:
error, traceback = p.exception
raise ValueError(error.args[0])
return result.get('result', None)
def get_support_from_data(data: str):
result = get_result_with_timeout(data)
if result is None:
raise ValueError('Script took too long')
exo_supports_compatibility = checkExoSupportCompatibility(result)
return exo_supports_compatibility
def get_support_from_path(path: str):
if not os.path.exists(path):
raise ValidationError('[Error] : No such file or directory')
is_binary = is_binary_file(path)
if is_binary:
mode = 'rb'
else:
mode = 'r'
with open(path, mode) as f:
data = f.read() if mode == "r" else f.read().decode('utf8')
return get_support_from_data(data)
def parseImports(m):
if isinstance(m, ast.Import):
m.names = [n for n in m.names if n.name in VALIDATED_MODULES]
if len(m.names) == 0:
return None
return m
if isinstance(m, ast.ImportFrom):
if m.module not in VALIDATED_MODULES:
return None
return m
return m
def parseCode(code):
code = re.sub(
r"__(.*?)__",
"",
code
)
code.replace("open", "")
code.replace("globals", "")
return code
def parseFile(data: str):
parsed = ast.parse(data).body
parsed = [parseImports(m) for m in parsed]
parsed = [p for p in parsed if p is not None]
unparsed = ast.unparse(parsed)
return parseCode(unparsed)
async def validate_file(file: UploadFile):
data = await file.read()
data = parseFile(data)
try:
exo_supports_compatibility = get_support_from_data(
data)
if not exo_supports_compatibility['pdf'] and not exo_supports_compatibility['csv'] and not exo_supports_compatibility['web']:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail={"exo_source_error":
'Exercice non valide (compatible avec aucun support)'})
except ValueError as e:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail={
"exo_source_error": e.args[0]})
except HTTPException as e:
raise e
await file.seek(0)
return {"file": file, "supports": exo_supports_compatibility}
async def validate_file_optionnal(file: UploadFile = None):
if not file:
return None
return await validate_file(file)