Compare commits
4 Commits
8ed548c435
...
ef257e6026
Author | SHA1 | Date | |
---|---|---|---|
|
ef257e6026 | ||
|
566eb778f2 | ||
|
318955865c | ||
|
dd6eb9bcbb |
@ -3,10 +3,12 @@ folder_artifacts_json: "artifacts/json"
|
||||
folder_blacklists: "blacklists"
|
||||
symlink_blacklist: "blacklist.csv"
|
||||
system: "g5k" # can be "local" for local execution
|
||||
prefix: "outputs"
|
||||
|
||||
site: "grenoble"
|
||||
cluster: "dahu"
|
||||
max_duration: 60 # 1 hour
|
||||
checkpoint: 1 # 1 minute
|
||||
besteffort: true
|
||||
sleep_time: 300 # 5 minutes
|
||||
besteffort: True
|
||||
#sleep_time: 300 # 5 minutes
|
||||
sleep_time: 30 # 0.5 minutes
|
||||
|
17
ecg.py
17
ecg.py
@ -105,7 +105,7 @@ def download_file(url, dest):
|
||||
hash_process = subprocess.run(f"sha256sum {file.name} | cut -d ' ' -f 1 | tr -d '\n'", capture_output=True, shell=True)
|
||||
return hash_process.stdout.decode("utf-8")
|
||||
|
||||
def download_sources(config):
|
||||
def download_sources(config, tmp_dir):
|
||||
"""
|
||||
Downloads the source of the artifact in 'config'.
|
||||
|
||||
@ -121,7 +121,7 @@ def download_sources(config):
|
||||
"""
|
||||
url = config["artifact_url"]
|
||||
artifact_name = trim(url)
|
||||
artifact_dir = os.path.join(cachedir_path, artifact_name)
|
||||
artifact_dir = os.path.join(tmp_dir, artifact_name)
|
||||
# Checking if artifact in cache. Not downloading if it is:
|
||||
if not os.path.exists(artifact_dir) or not use_cache:
|
||||
logging.info(f"Downloading artifact from {url}")
|
||||
@ -148,7 +148,7 @@ def download_sources(config):
|
||||
logging.info(f"Cache found for {url}, skipping download")
|
||||
return artifact_dir
|
||||
|
||||
def buildstatus_saver(output):
|
||||
def buildstatus_saver(output, buildstatus_path):
|
||||
"""
|
||||
Parses the given 'output' to indentify the errors, then saves them to the
|
||||
'build_status' file.
|
||||
@ -347,13 +347,16 @@ def main():
|
||||
# print(config)
|
||||
config_file.close()
|
||||
|
||||
src_dir = download_sources(config)
|
||||
return_code, build_output = build_image(config, src_dir)
|
||||
tmp_dir = tempfile.TemporaryDirectory()
|
||||
artifact_dir = download_sources(config, tmp_dir.name)
|
||||
return_code, build_output = build_image(config, artifact_dir)
|
||||
if return_code == 0:
|
||||
check_env(config, src_dir)
|
||||
check_env(config, artifact_dir)
|
||||
remove_image(config)
|
||||
pathlib.Path(buildstatus_path).touch()
|
||||
else:
|
||||
buildstatus_saver(build_output)
|
||||
pathlib.Path(pkglist_path).touch()
|
||||
buildstatus_saver(build_output, buildstatus_path)
|
||||
|
||||
if not use_cache:
|
||||
os.system(f"rm -rf {os.path.join(cachedir_path, trim(config['artifact_url']))}")
|
||||
|
@ -2,6 +2,7 @@ configfile: "config/config.yaml"
|
||||
|
||||
include: "utils.smk"
|
||||
|
||||
import os
|
||||
import datetime
|
||||
DATE = datetime.datetime.now().strftime("%Y%m%d")
|
||||
|
||||
@ -11,17 +12,18 @@ BLACKLIST_FOLDER = config["folder_blacklists"]
|
||||
BLACKLIST = config["symlink_blacklist"]
|
||||
EXTENSION = "json"
|
||||
SYSTEM = config["system"]
|
||||
PREFIX = config["prefix"]
|
||||
|
||||
ARTIFACTS = get_artifacts_to_build(ARTIFACTS_FOLDER_NICKEL, BLACKLIST)
|
||||
|
||||
rule all:
|
||||
input:
|
||||
expand("{folder}/{artifact}/{date}.csv",\
|
||||
expand(f"{PREFIX}/{{folder}}/{{artifact}}/{{date}}.csv",\
|
||||
folder=["pkgs", "build_status", "artifact_hash"],\
|
||||
artifact=ARTIFACTS,\
|
||||
date=DATE
|
||||
),
|
||||
expand("{folder}/{artifact}/{date}.txt",\
|
||||
expand(f"{PREFIX}/{{folder}}/{{artifact}}/{{date}}.txt",\
|
||||
folder=["logs"],\
|
||||
artifact=ARTIFACTS,\
|
||||
date=DATE
|
||||
@ -48,7 +50,7 @@ rule check_artifact:
|
||||
|
||||
SHELLS_ECG = {
|
||||
"local": f"python3 {{input.ecg}} -l {{output.log}} -p {{output.pkg}} -b {{output.build_status}} -a {{output.artifact_hash}} {ARTIFACTS_FOLDER_JSON}/{{wildcards.artifact}}.{EXTENSION}",
|
||||
"g5k": f"python3 {{input.execo_wrapper}} --site {config['site']} --cluster {config['cluster']} --max_duration {config['max_duration']} --checkpoint {config['checkpoint']} {'--besteffort' if config['besteffort'] else ''} --sleep_time {config['sleep_time']} -- "
|
||||
"g5k": f"python3 {{input.execo_wrapper}} --path {os.getcwd()} --script {{input.oar_wrapper}} --site {config['site']} --cluster {config['cluster']} --max-duration {config['max_duration']} --checkpoint {config['checkpoint']} {'--besteffort' if config['besteffort'] else ''} --sleep_time {config['sleep_time']} -- '"
|
||||
}
|
||||
|
||||
rule run_ecg:
|
||||
@ -60,19 +62,19 @@ rule run_ecg:
|
||||
oar_wrapper="workflow/scripts/ecg_wrapper.oar.bash",
|
||||
artifact=f"{ARTIFACTS_FOLDER_JSON}/{{artifact}}.{EXTENSION}"
|
||||
output:
|
||||
log = "logs/{artifact}/{date}.txt",
|
||||
pkg = "pkgs/{artifact}/{date}.csv",
|
||||
build_status = "build_status/{artifact}/{date}.csv",
|
||||
artifact_hash = "artifact_hash/{artifact}/{date}.csv"
|
||||
log = f"{PREFIX}/logs/{{artifact}}/{{date}}.txt",
|
||||
pkg = f"{PREFIX}/pkgs/{{artifact}}/{{date}}.csv",
|
||||
build_status = f"{PREFIX}/build_status/{{artifact}}/{{date}}.csv",
|
||||
artifact_hash = f"{PREFIX}/artifact_hash/{{artifact}}/{{date}}.csv"
|
||||
shell:
|
||||
(SHELLS_ECG["g5k"] if SYSTEM == "g5k" else "") + SHELLS_ECG["local"]
|
||||
(SHELLS_ECG["g5k"] if SYSTEM == "g5k" else "") + SHELLS_ECG["local"] + ("'" if SYSTEM == "g5k" else "")
|
||||
|
||||
rule update_blacklist:
|
||||
input:
|
||||
BLACKLIST,
|
||||
build_status=expand("build_status/{artifact}/{{date}}.csv",\
|
||||
build_status=expand(f"{PREFIX}/build_status/{{artifact}}/{{{{date}}}}.csv",\
|
||||
artifact=ARTIFACTS)
|
||||
output:
|
||||
f"{BLACKLIST_FOLDER}/{{date}}.csv"
|
||||
shell:
|
||||
f"cat {{input}} > {{output}} && ln -s {{output}} {BLACKLIST}"
|
||||
f"cat {{input}} > {{output}} && rm -rf {BLACKLIST} && ln -s {{output}} {BLACKLIST}"
|
||||
|
4
workflow/scripts/ecg_wrapper.oar.bash
Normal file → Executable file
4
workflow/scripts/ecg_wrapper.oar.bash
Normal file → Executable file
@ -14,4 +14,8 @@ handler() {
|
||||
}
|
||||
trap handler SIGUSR2
|
||||
|
||||
cd $1
|
||||
|
||||
shift
|
||||
|
||||
nix develop --command $@
|
||||
|
@ -2,7 +2,7 @@ from execo_g5k import oardel, oarsub, OarSubmission, wait_oar_job_start, get_oar
|
||||
import time
|
||||
import argparse
|
||||
|
||||
def submit_job(cluster, site, maximum_duration_minutes, checkpoint_minutes, is_besteffort, path_to_script, command):
|
||||
def submit_job(cluster, site, maximum_duration_minutes, checkpoint_minutes, is_besteffort, path, script, command):
|
||||
reservation_duration = (maximum_duration_minutes + checkpoint_minutes) * 60
|
||||
checkpoint = checkpoint_minutes * 60
|
||||
job_type = []
|
||||
@ -13,13 +13,13 @@ def submit_job(cluster, site, maximum_duration_minutes, checkpoint_minutes, is_b
|
||||
reservation_duration,\
|
||||
job_type=job_type,\
|
||||
additional_options=f"--checkpoint {checkpoint}",\
|
||||
command=f"{path_to_script} {command}"), site)])[0]
|
||||
command=f"{path}/{script} {path} {command}"), site)])[0]
|
||||
return oar_job_id
|
||||
|
||||
def wait_for_completion(oar_job_id, site, sleep_time):
|
||||
state = "Running"
|
||||
while state != "Terminated" and state != "Error":
|
||||
time.sleep(sleeping_time)
|
||||
time.sleep(sleep_time)
|
||||
info = get_oar_job_info(oar_job_id, site)
|
||||
state = info["state"]
|
||||
|
||||
@ -30,13 +30,14 @@ def main():
|
||||
parser.add_argument("--max-duration", required=True, type=int, help="Max Duration in MINUTES of the docker build")
|
||||
parser.add_argument("--checkpoint", required=True, type=int, help="Duration in MINUTES before the end of the job to do the checkpoint")
|
||||
parser.add_argument("--besteffort", action='store_false', help="Submit the job as besteffort")
|
||||
parser.add_argument("--path", required=True, help="Path to the bash script to oarsub")
|
||||
parser.add_argument("--sleep_time", required=False, default=60, help="Time interval in seconds to check the termination of the job")
|
||||
parser.add_argument("--path", required=True, help="Root of the project")
|
||||
parser.add_argument("--script", required=True, help="Path of the bash script to oarsub relative to the '--path'")
|
||||
parser.add_argument("--sleep_time", required=False, type=int, default=60, help="Time interval in seconds to check the termination of the job")
|
||||
parser.add_argument("command", help="ECG Command")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
oar_job_id = submit_job(args.cluster, args.site, args.max_duration, args.checkpoint_minutes, args.besteffort, args.path, args.command)
|
||||
oar_job_id = submit_job(args.cluster, args.site, args.max_duration, args.checkpoint, args.besteffort, args.path, args.script, args.command)
|
||||
|
||||
wait_oar_job_start(oar_job_id, args.site)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user