import ast from multiprocessing import Manager import random import re import string from fastapi.exceptions import HTTPException from fastapi import UploadFile, status import sympy from tortoise.validators import Validator from tortoise.exceptions import ValidationError import types from services.timeout import Process from services.io import is_binary_file import os VALIDATED_MODULES = ["random", "string", "sympy"] def checkExoSupportCompatibility(obj): isPdf = False if (obj['pdf'] == None or ( obj['calcul'] == False and obj['pdf'] == False)) else True isCsv = False if (obj['csv'] == None or ( obj['calcul'] == False and obj['csv'] == False)) else True isWeb = False if (obj['web'] == None or ( obj['calcul'] == False and obj['web'] == False)) else True return { 'pdf': isPdf, 'csv': isCsv, 'web': isWeb} def get_module_from_string(value: str, *args, **kwargs): locs = {} try: exec(value, {"random": random, "string": string, "sympy": sympy}, locs) except Exception as err: raise ValueError(err.args[0]) return locs def execute_main_if_avalaible(spec, *args, **kwargs): try: return spec["main"]() except KeyError as atrerror: raise ValueError(f"Fonction 'main' introuvable") except Exception as e: raise ValueError(f'[Error] : {e}') def get_spec_with_timeout(data, time): return get_module_from_string(data) ''' with timeout(time, ValidationError('[Error] : Script took too long')): return get_module_from_string(data) ''' def fill_empty_values(object): default_object = {"calcul": False, 'pdf': False, 'csv': False, 'web': False, 'correction': False} return {**default_object, **object} def get_results(data, resultObj): locs = get_spec_with_timeout(data, 5) result = execute_main_if_avalaible(locs) result = fill_empty_values(result) resultObj['result'] = result def get_result_with_timeout(data): m = Manager() result = m.dict() p = Process(target=get_results, args=(data, result)) p.start() p.join(timeout=3) p.terminate() if p.exception: error, traceback = p.exception raise ValueError(error.args[0]) return result.get('result', None) def get_support_from_data(data: str): result = get_result_with_timeout(data) if result is None: raise ValueError('Script took too long') exo_supports_compatibility = checkExoSupportCompatibility(result) return exo_supports_compatibility def get_support_from_path(path: str): if not os.path.exists(path): raise ValidationError('[Error] : No such file or directory') is_binary = is_binary_file(path) if is_binary: mode = 'rb' else: mode = 'r' with open(path, mode) as f: data = f.read() if mode == "r" else f.read().decode('utf8') return get_support_from_data(data) def parseImports(m): if isinstance(m, ast.Import): m.names = [n for n in m.names if n.name in VALIDATED_MODULES] if len(m.names) == 0: return None return m if isinstance(m, ast.ImportFrom): if m.module not in VALIDATED_MODULES: return None return m return m def parseCode(code): code = re.sub( r"__(.*?)__", "", code ) code.replace("open", "") code.replace("globals", "") return code def parseFile(data: str): parsed = ast.parse(data).body parsed = [parseImports(m) for m in parsed] parsed = [p for p in parsed if p is not None] unparsed = ast.unparse(parsed) return parseCode(unparsed) async def validate_file(file: UploadFile): data = await file.read() data = parseFile(data) try: exo_supports_compatibility = get_support_from_data( data) if not exo_supports_compatibility['pdf'] and not exo_supports_compatibility['csv'] and not exo_supports_compatibility['web']: raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail={"exo_source_error": 'Exercice non valide (compatible avec aucun support)'}) except ValueError as e: raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail={ "exo_source_error": e.args[0]}) except HTTPException as e: raise e await file.seek(0) return {"file": file, "supports": exo_supports_compatibility} async def validate_file_optionnal(file: UploadFile = None): if not file: return None return await validate_file(file)