47 lines
1.3 KiB
Python
47 lines
1.3 KiB
Python
|
|
import io
|
|
import os
|
|
from typing import IO, Type
|
|
|
|
from services.io import add_fast_api_root, get_filename, get_or_create_dir, remove_fastapi_root
|
|
|
|
|
|
class FileFieldMeta(type):
|
|
def __getitem__(self, upload_root: str,) -> Type['FileField']:
|
|
return type('MyFieldValue', (FileField,), {'upload_root': upload_root})
|
|
|
|
|
|
class FileField(str, metaclass=FileFieldMeta):
|
|
upload_root: str
|
|
|
|
@classmethod
|
|
def __get_validators__(cls):
|
|
yield cls.validate
|
|
|
|
@classmethod
|
|
def validate(cls, value: str | IO, values):
|
|
upload_root = get_or_create_dir(
|
|
add_fast_api_root(cls.upload_root))
|
|
if not isinstance(value, str):
|
|
value.seek(0)
|
|
|
|
is_binary = isinstance(value, io.BytesIO)
|
|
|
|
name = get_filename(value, 'exo_source.py')
|
|
|
|
parent = get_or_create_dir(os.path.join(
|
|
upload_root, values['id_code']))
|
|
|
|
mode = 'w+' if not is_binary else 'wb+'
|
|
|
|
path = os.path.join(parent, name)
|
|
with open(path, mode) as f:
|
|
f.write(value.read())
|
|
|
|
return remove_fastapi_root(path)
|
|
|
|
else:
|
|
if not os.path.exists(add_fast_api_root(value)):
|
|
raise ValueError('File does not exist')
|
|
return value
|