Compare commits

..

4 Commits

Author SHA1 Message Date
Quentin Guilloteau
ef257e6026 add prefix to config 2024-07-21 16:14:58 +02:00
Quentin Guilloteau
566eb778f2 fix execution on grid5000 from namespace 2024-07-21 16:13:52 +02:00
Quentin Guilloteau
318955865c typo 2024-07-21 16:12:40 +02:00
Quentin Guilloteau
dd6eb9bcbb now using tmp dir to decompress the artifact 2024-07-21 16:11:55 +02:00
5 changed files with 37 additions and 25 deletions

View File

@ -3,10 +3,12 @@ folder_artifacts_json: "artifacts/json"
folder_blacklists: "blacklists" folder_blacklists: "blacklists"
symlink_blacklist: "blacklist.csv" symlink_blacklist: "blacklist.csv"
system: "g5k" # can be "local" for local execution system: "g5k" # can be "local" for local execution
prefix: "outputs"
site: "grenoble" site: "grenoble"
cluster: "dahu" cluster: "dahu"
max_duration: 60 # 1 hour max_duration: 60 # 1 hour
checkpoint: 1 # 1 minute checkpoint: 1 # 1 minute
besteffort: true besteffort: True
sleep_time: 300 # 5 minutes #sleep_time: 300 # 5 minutes
sleep_time: 30 # 0.5 minutes

17
ecg.py
View File

@ -105,7 +105,7 @@ def download_file(url, dest):
hash_process = subprocess.run(f"sha256sum {file.name} | cut -d ' ' -f 1 | tr -d '\n'", capture_output=True, shell=True) hash_process = subprocess.run(f"sha256sum {file.name} | cut -d ' ' -f 1 | tr -d '\n'", capture_output=True, shell=True)
return hash_process.stdout.decode("utf-8") return hash_process.stdout.decode("utf-8")
def download_sources(config): def download_sources(config, tmp_dir):
""" """
Downloads the source of the artifact in 'config'. Downloads the source of the artifact in 'config'.
@ -121,7 +121,7 @@ def download_sources(config):
""" """
url = config["artifact_url"] url = config["artifact_url"]
artifact_name = trim(url) artifact_name = trim(url)
artifact_dir = os.path.join(cachedir_path, artifact_name) artifact_dir = os.path.join(tmp_dir, artifact_name)
# Checking if artifact in cache. Not downloading if it is: # Checking if artifact in cache. Not downloading if it is:
if not os.path.exists(artifact_dir) or not use_cache: if not os.path.exists(artifact_dir) or not use_cache:
logging.info(f"Downloading artifact from {url}") logging.info(f"Downloading artifact from {url}")
@ -148,7 +148,7 @@ def download_sources(config):
logging.info(f"Cache found for {url}, skipping download") logging.info(f"Cache found for {url}, skipping download")
return artifact_dir return artifact_dir
def buildstatus_saver(output): def buildstatus_saver(output, buildstatus_path):
""" """
Parses the given 'output' to indentify the errors, then saves them to the Parses the given 'output' to indentify the errors, then saves them to the
'build_status' file. 'build_status' file.
@ -347,13 +347,16 @@ def main():
# print(config) # print(config)
config_file.close() config_file.close()
src_dir = download_sources(config) tmp_dir = tempfile.TemporaryDirectory()
return_code, build_output = build_image(config, src_dir) artifact_dir = download_sources(config, tmp_dir.name)
return_code, build_output = build_image(config, artifact_dir)
if return_code == 0: if return_code == 0:
check_env(config, src_dir) check_env(config, artifact_dir)
remove_image(config) remove_image(config)
pathlib.Path(buildstatus_path).touch()
else: else:
buildstatus_saver(build_output) pathlib.Path(pkglist_path).touch()
buildstatus_saver(build_output, buildstatus_path)
if not use_cache: if not use_cache:
os.system(f"rm -rf {os.path.join(cachedir_path, trim(config['artifact_url']))}") os.system(f"rm -rf {os.path.join(cachedir_path, trim(config['artifact_url']))}")

View File

@ -2,6 +2,7 @@ configfile: "config/config.yaml"
include: "utils.smk" include: "utils.smk"
import os
import datetime import datetime
DATE = datetime.datetime.now().strftime("%Y%m%d") DATE = datetime.datetime.now().strftime("%Y%m%d")
@ -11,17 +12,18 @@ BLACKLIST_FOLDER = config["folder_blacklists"]
BLACKLIST = config["symlink_blacklist"] BLACKLIST = config["symlink_blacklist"]
EXTENSION = "json" EXTENSION = "json"
SYSTEM = config["system"] SYSTEM = config["system"]
PREFIX = config["prefix"]
ARTIFACTS = get_artifacts_to_build(ARTIFACTS_FOLDER_NICKEL, BLACKLIST) ARTIFACTS = get_artifacts_to_build(ARTIFACTS_FOLDER_NICKEL, BLACKLIST)
rule all: rule all:
input: input:
expand("{folder}/{artifact}/{date}.csv",\ expand(f"{PREFIX}/{{folder}}/{{artifact}}/{{date}}.csv",\
folder=["pkgs", "build_status", "artifact_hash"],\ folder=["pkgs", "build_status", "artifact_hash"],\
artifact=ARTIFACTS,\ artifact=ARTIFACTS,\
date=DATE date=DATE
), ),
expand("{folder}/{artifact}/{date}.txt",\ expand(f"{PREFIX}/{{folder}}/{{artifact}}/{{date}}.txt",\
folder=["logs"],\ folder=["logs"],\
artifact=ARTIFACTS,\ artifact=ARTIFACTS,\
date=DATE date=DATE
@ -48,7 +50,7 @@ rule check_artifact:
SHELLS_ECG = { SHELLS_ECG = {
"local": f"python3 {{input.ecg}} -l {{output.log}} -p {{output.pkg}} -b {{output.build_status}} -a {{output.artifact_hash}} {ARTIFACTS_FOLDER_JSON}/{{wildcards.artifact}}.{EXTENSION}", "local": f"python3 {{input.ecg}} -l {{output.log}} -p {{output.pkg}} -b {{output.build_status}} -a {{output.artifact_hash}} {ARTIFACTS_FOLDER_JSON}/{{wildcards.artifact}}.{EXTENSION}",
"g5k": f"python3 {{input.execo_wrapper}} --site {config['site']} --cluster {config['cluster']} --max_duration {config['max_duration']} --checkpoint {config['checkpoint']} {'--besteffort' if config['besteffort'] else ''} --sleep_time {config['sleep_time']} -- " "g5k": f"python3 {{input.execo_wrapper}} --path {os.getcwd()} --script {{input.oar_wrapper}} --site {config['site']} --cluster {config['cluster']} --max-duration {config['max_duration']} --checkpoint {config['checkpoint']} {'--besteffort' if config['besteffort'] else ''} --sleep_time {config['sleep_time']} -- '"
} }
rule run_ecg: rule run_ecg:
@ -60,19 +62,19 @@ rule run_ecg:
oar_wrapper="workflow/scripts/ecg_wrapper.oar.bash", oar_wrapper="workflow/scripts/ecg_wrapper.oar.bash",
artifact=f"{ARTIFACTS_FOLDER_JSON}/{{artifact}}.{EXTENSION}" artifact=f"{ARTIFACTS_FOLDER_JSON}/{{artifact}}.{EXTENSION}"
output: output:
log = "logs/{artifact}/{date}.txt", log = f"{PREFIX}/logs/{{artifact}}/{{date}}.txt",
pkg = "pkgs/{artifact}/{date}.csv", pkg = f"{PREFIX}/pkgs/{{artifact}}/{{date}}.csv",
build_status = "build_status/{artifact}/{date}.csv", build_status = f"{PREFIX}/build_status/{{artifact}}/{{date}}.csv",
artifact_hash = "artifact_hash/{artifact}/{date}.csv" artifact_hash = f"{PREFIX}/artifact_hash/{{artifact}}/{{date}}.csv"
shell: shell:
(SHELLS_ECG["g5k"] if SYSTEM == "g5k" else "") + SHELLS_ECG["local"] (SHELLS_ECG["g5k"] if SYSTEM == "g5k" else "") + SHELLS_ECG["local"] + ("'" if SYSTEM == "g5k" else "")
rule update_blacklist: rule update_blacklist:
input: input:
BLACKLIST, BLACKLIST,
build_status=expand("build_status/{artifact}/{{date}}.csv",\ build_status=expand(f"{PREFIX}/build_status/{{artifact}}/{{{{date}}}}.csv",\
artifact=ARTIFACTS) artifact=ARTIFACTS)
output: output:
f"{BLACKLIST_FOLDER}/{{date}}.csv" f"{BLACKLIST_FOLDER}/{{date}}.csv"
shell: shell:
f"cat {{input}} > {{output}} && ln -s {{output}} {BLACKLIST}" f"cat {{input}} > {{output}} && rm -rf {BLACKLIST} && ln -s {{output}} {BLACKLIST}"

4
workflow/scripts/ecg_wrapper.oar.bash Normal file → Executable file
View File

@ -14,4 +14,8 @@ handler() {
} }
trap handler SIGUSR2 trap handler SIGUSR2
cd $1
shift
nix develop --command $@ nix develop --command $@

View File

@ -2,7 +2,7 @@ from execo_g5k import oardel, oarsub, OarSubmission, wait_oar_job_start, get_oar
import time import time
import argparse import argparse
def submit_job(cluster, site, maximum_duration_minutes, checkpoint_minutes, is_besteffort, path_to_script, command): def submit_job(cluster, site, maximum_duration_minutes, checkpoint_minutes, is_besteffort, path, script, command):
reservation_duration = (maximum_duration_minutes + checkpoint_minutes) * 60 reservation_duration = (maximum_duration_minutes + checkpoint_minutes) * 60
checkpoint = checkpoint_minutes * 60 checkpoint = checkpoint_minutes * 60
job_type = [] job_type = []
@ -13,13 +13,13 @@ def submit_job(cluster, site, maximum_duration_minutes, checkpoint_minutes, is_b
reservation_duration,\ reservation_duration,\
job_type=job_type,\ job_type=job_type,\
additional_options=f"--checkpoint {checkpoint}",\ additional_options=f"--checkpoint {checkpoint}",\
command=f"{path_to_script} {command}"), site)])[0] command=f"{path}/{script} {path} {command}"), site)])[0]
return oar_job_id return oar_job_id
def wait_for_completion(oar_job_id, site, sleep_time): def wait_for_completion(oar_job_id, site, sleep_time):
state = "Running" state = "Running"
while state != "Terminated" and state != "Error": while state != "Terminated" and state != "Error":
time.sleep(sleeping_time) time.sleep(sleep_time)
info = get_oar_job_info(oar_job_id, site) info = get_oar_job_info(oar_job_id, site)
state = info["state"] state = info["state"]
@ -30,13 +30,14 @@ def main():
parser.add_argument("--max-duration", required=True, type=int, help="Max Duration in MINUTES of the docker build") parser.add_argument("--max-duration", required=True, type=int, help="Max Duration in MINUTES of the docker build")
parser.add_argument("--checkpoint", required=True, type=int, help="Duration in MINUTES before the end of the job to do the checkpoint") parser.add_argument("--checkpoint", required=True, type=int, help="Duration in MINUTES before the end of the job to do the checkpoint")
parser.add_argument("--besteffort", action='store_false', help="Submit the job as besteffort") parser.add_argument("--besteffort", action='store_false', help="Submit the job as besteffort")
parser.add_argument("--path", required=True, help="Path to the bash script to oarsub") parser.add_argument("--path", required=True, help="Root of the project")
parser.add_argument("--sleep_time", required=False, default=60, help="Time interval in seconds to check the termination of the job") parser.add_argument("--script", required=True, help="Path of the bash script to oarsub relative to the '--path'")
parser.add_argument("--sleep_time", required=False, type=int, default=60, help="Time interval in seconds to check the termination of the job")
parser.add_argument("command", help="ECG Command") parser.add_argument("command", help="ECG Command")
args = parser.parse_args() args = parser.parse_args()
oar_job_id = submit_job(args.cluster, args.site, args.max_duration, args.checkpoint_minutes, args.besteffort, args.path, args.command) oar_job_id = submit_job(args.cluster, args.site, args.max_duration, args.checkpoint, args.besteffort, args.path, args.script, args.command)
wait_oar_job_start(oar_job_id, args.site) wait_oar_job_start(oar_job_id, args.site)