Compare commits
4 Commits
8ed548c435
...
ef257e6026
Author | SHA1 | Date | |
---|---|---|---|
|
ef257e6026 | ||
|
566eb778f2 | ||
|
318955865c | ||
|
dd6eb9bcbb |
@ -3,10 +3,12 @@ folder_artifacts_json: "artifacts/json"
|
|||||||
folder_blacklists: "blacklists"
|
folder_blacklists: "blacklists"
|
||||||
symlink_blacklist: "blacklist.csv"
|
symlink_blacklist: "blacklist.csv"
|
||||||
system: "g5k" # can be "local" for local execution
|
system: "g5k" # can be "local" for local execution
|
||||||
|
prefix: "outputs"
|
||||||
|
|
||||||
site: "grenoble"
|
site: "grenoble"
|
||||||
cluster: "dahu"
|
cluster: "dahu"
|
||||||
max_duration: 60 # 1 hour
|
max_duration: 60 # 1 hour
|
||||||
checkpoint: 1 # 1 minute
|
checkpoint: 1 # 1 minute
|
||||||
besteffort: true
|
besteffort: True
|
||||||
sleep_time: 300 # 5 minutes
|
#sleep_time: 300 # 5 minutes
|
||||||
|
sleep_time: 30 # 0.5 minutes
|
||||||
|
17
ecg.py
17
ecg.py
@ -105,7 +105,7 @@ def download_file(url, dest):
|
|||||||
hash_process = subprocess.run(f"sha256sum {file.name} | cut -d ' ' -f 1 | tr -d '\n'", capture_output=True, shell=True)
|
hash_process = subprocess.run(f"sha256sum {file.name} | cut -d ' ' -f 1 | tr -d '\n'", capture_output=True, shell=True)
|
||||||
return hash_process.stdout.decode("utf-8")
|
return hash_process.stdout.decode("utf-8")
|
||||||
|
|
||||||
def download_sources(config):
|
def download_sources(config, tmp_dir):
|
||||||
"""
|
"""
|
||||||
Downloads the source of the artifact in 'config'.
|
Downloads the source of the artifact in 'config'.
|
||||||
|
|
||||||
@ -121,7 +121,7 @@ def download_sources(config):
|
|||||||
"""
|
"""
|
||||||
url = config["artifact_url"]
|
url = config["artifact_url"]
|
||||||
artifact_name = trim(url)
|
artifact_name = trim(url)
|
||||||
artifact_dir = os.path.join(cachedir_path, artifact_name)
|
artifact_dir = os.path.join(tmp_dir, artifact_name)
|
||||||
# Checking if artifact in cache. Not downloading if it is:
|
# Checking if artifact in cache. Not downloading if it is:
|
||||||
if not os.path.exists(artifact_dir) or not use_cache:
|
if not os.path.exists(artifact_dir) or not use_cache:
|
||||||
logging.info(f"Downloading artifact from {url}")
|
logging.info(f"Downloading artifact from {url}")
|
||||||
@ -148,7 +148,7 @@ def download_sources(config):
|
|||||||
logging.info(f"Cache found for {url}, skipping download")
|
logging.info(f"Cache found for {url}, skipping download")
|
||||||
return artifact_dir
|
return artifact_dir
|
||||||
|
|
||||||
def buildstatus_saver(output):
|
def buildstatus_saver(output, buildstatus_path):
|
||||||
"""
|
"""
|
||||||
Parses the given 'output' to indentify the errors, then saves them to the
|
Parses the given 'output' to indentify the errors, then saves them to the
|
||||||
'build_status' file.
|
'build_status' file.
|
||||||
@ -347,13 +347,16 @@ def main():
|
|||||||
# print(config)
|
# print(config)
|
||||||
config_file.close()
|
config_file.close()
|
||||||
|
|
||||||
src_dir = download_sources(config)
|
tmp_dir = tempfile.TemporaryDirectory()
|
||||||
return_code, build_output = build_image(config, src_dir)
|
artifact_dir = download_sources(config, tmp_dir.name)
|
||||||
|
return_code, build_output = build_image(config, artifact_dir)
|
||||||
if return_code == 0:
|
if return_code == 0:
|
||||||
check_env(config, src_dir)
|
check_env(config, artifact_dir)
|
||||||
remove_image(config)
|
remove_image(config)
|
||||||
|
pathlib.Path(buildstatus_path).touch()
|
||||||
else:
|
else:
|
||||||
buildstatus_saver(build_output)
|
pathlib.Path(pkglist_path).touch()
|
||||||
|
buildstatus_saver(build_output, buildstatus_path)
|
||||||
|
|
||||||
if not use_cache:
|
if not use_cache:
|
||||||
os.system(f"rm -rf {os.path.join(cachedir_path, trim(config['artifact_url']))}")
|
os.system(f"rm -rf {os.path.join(cachedir_path, trim(config['artifact_url']))}")
|
||||||
|
@ -2,6 +2,7 @@ configfile: "config/config.yaml"
|
|||||||
|
|
||||||
include: "utils.smk"
|
include: "utils.smk"
|
||||||
|
|
||||||
|
import os
|
||||||
import datetime
|
import datetime
|
||||||
DATE = datetime.datetime.now().strftime("%Y%m%d")
|
DATE = datetime.datetime.now().strftime("%Y%m%d")
|
||||||
|
|
||||||
@ -11,17 +12,18 @@ BLACKLIST_FOLDER = config["folder_blacklists"]
|
|||||||
BLACKLIST = config["symlink_blacklist"]
|
BLACKLIST = config["symlink_blacklist"]
|
||||||
EXTENSION = "json"
|
EXTENSION = "json"
|
||||||
SYSTEM = config["system"]
|
SYSTEM = config["system"]
|
||||||
|
PREFIX = config["prefix"]
|
||||||
|
|
||||||
ARTIFACTS = get_artifacts_to_build(ARTIFACTS_FOLDER_NICKEL, BLACKLIST)
|
ARTIFACTS = get_artifacts_to_build(ARTIFACTS_FOLDER_NICKEL, BLACKLIST)
|
||||||
|
|
||||||
rule all:
|
rule all:
|
||||||
input:
|
input:
|
||||||
expand("{folder}/{artifact}/{date}.csv",\
|
expand(f"{PREFIX}/{{folder}}/{{artifact}}/{{date}}.csv",\
|
||||||
folder=["pkgs", "build_status", "artifact_hash"],\
|
folder=["pkgs", "build_status", "artifact_hash"],\
|
||||||
artifact=ARTIFACTS,\
|
artifact=ARTIFACTS,\
|
||||||
date=DATE
|
date=DATE
|
||||||
),
|
),
|
||||||
expand("{folder}/{artifact}/{date}.txt",\
|
expand(f"{PREFIX}/{{folder}}/{{artifact}}/{{date}}.txt",\
|
||||||
folder=["logs"],\
|
folder=["logs"],\
|
||||||
artifact=ARTIFACTS,\
|
artifact=ARTIFACTS,\
|
||||||
date=DATE
|
date=DATE
|
||||||
@ -48,7 +50,7 @@ rule check_artifact:
|
|||||||
|
|
||||||
SHELLS_ECG = {
|
SHELLS_ECG = {
|
||||||
"local": f"python3 {{input.ecg}} -l {{output.log}} -p {{output.pkg}} -b {{output.build_status}} -a {{output.artifact_hash}} {ARTIFACTS_FOLDER_JSON}/{{wildcards.artifact}}.{EXTENSION}",
|
"local": f"python3 {{input.ecg}} -l {{output.log}} -p {{output.pkg}} -b {{output.build_status}} -a {{output.artifact_hash}} {ARTIFACTS_FOLDER_JSON}/{{wildcards.artifact}}.{EXTENSION}",
|
||||||
"g5k": f"python3 {{input.execo_wrapper}} --site {config['site']} --cluster {config['cluster']} --max_duration {config['max_duration']} --checkpoint {config['checkpoint']} {'--besteffort' if config['besteffort'] else ''} --sleep_time {config['sleep_time']} -- "
|
"g5k": f"python3 {{input.execo_wrapper}} --path {os.getcwd()} --script {{input.oar_wrapper}} --site {config['site']} --cluster {config['cluster']} --max-duration {config['max_duration']} --checkpoint {config['checkpoint']} {'--besteffort' if config['besteffort'] else ''} --sleep_time {config['sleep_time']} -- '"
|
||||||
}
|
}
|
||||||
|
|
||||||
rule run_ecg:
|
rule run_ecg:
|
||||||
@ -60,19 +62,19 @@ rule run_ecg:
|
|||||||
oar_wrapper="workflow/scripts/ecg_wrapper.oar.bash",
|
oar_wrapper="workflow/scripts/ecg_wrapper.oar.bash",
|
||||||
artifact=f"{ARTIFACTS_FOLDER_JSON}/{{artifact}}.{EXTENSION}"
|
artifact=f"{ARTIFACTS_FOLDER_JSON}/{{artifact}}.{EXTENSION}"
|
||||||
output:
|
output:
|
||||||
log = "logs/{artifact}/{date}.txt",
|
log = f"{PREFIX}/logs/{{artifact}}/{{date}}.txt",
|
||||||
pkg = "pkgs/{artifact}/{date}.csv",
|
pkg = f"{PREFIX}/pkgs/{{artifact}}/{{date}}.csv",
|
||||||
build_status = "build_status/{artifact}/{date}.csv",
|
build_status = f"{PREFIX}/build_status/{{artifact}}/{{date}}.csv",
|
||||||
artifact_hash = "artifact_hash/{artifact}/{date}.csv"
|
artifact_hash = f"{PREFIX}/artifact_hash/{{artifact}}/{{date}}.csv"
|
||||||
shell:
|
shell:
|
||||||
(SHELLS_ECG["g5k"] if SYSTEM == "g5k" else "") + SHELLS_ECG["local"]
|
(SHELLS_ECG["g5k"] if SYSTEM == "g5k" else "") + SHELLS_ECG["local"] + ("'" if SYSTEM == "g5k" else "")
|
||||||
|
|
||||||
rule update_blacklist:
|
rule update_blacklist:
|
||||||
input:
|
input:
|
||||||
BLACKLIST,
|
BLACKLIST,
|
||||||
build_status=expand("build_status/{artifact}/{{date}}.csv",\
|
build_status=expand(f"{PREFIX}/build_status/{{artifact}}/{{{{date}}}}.csv",\
|
||||||
artifact=ARTIFACTS)
|
artifact=ARTIFACTS)
|
||||||
output:
|
output:
|
||||||
f"{BLACKLIST_FOLDER}/{{date}}.csv"
|
f"{BLACKLIST_FOLDER}/{{date}}.csv"
|
||||||
shell:
|
shell:
|
||||||
f"cat {{input}} > {{output}} && ln -s {{output}} {BLACKLIST}"
|
f"cat {{input}} > {{output}} && rm -rf {BLACKLIST} && ln -s {{output}} {BLACKLIST}"
|
||||||
|
4
workflow/scripts/ecg_wrapper.oar.bash
Normal file → Executable file
4
workflow/scripts/ecg_wrapper.oar.bash
Normal file → Executable file
@ -14,4 +14,8 @@ handler() {
|
|||||||
}
|
}
|
||||||
trap handler SIGUSR2
|
trap handler SIGUSR2
|
||||||
|
|
||||||
|
cd $1
|
||||||
|
|
||||||
|
shift
|
||||||
|
|
||||||
nix develop --command $@
|
nix develop --command $@
|
||||||
|
@ -2,7 +2,7 @@ from execo_g5k import oardel, oarsub, OarSubmission, wait_oar_job_start, get_oar
|
|||||||
import time
|
import time
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
def submit_job(cluster, site, maximum_duration_minutes, checkpoint_minutes, is_besteffort, path_to_script, command):
|
def submit_job(cluster, site, maximum_duration_minutes, checkpoint_minutes, is_besteffort, path, script, command):
|
||||||
reservation_duration = (maximum_duration_minutes + checkpoint_minutes) * 60
|
reservation_duration = (maximum_duration_minutes + checkpoint_minutes) * 60
|
||||||
checkpoint = checkpoint_minutes * 60
|
checkpoint = checkpoint_minutes * 60
|
||||||
job_type = []
|
job_type = []
|
||||||
@ -13,13 +13,13 @@ def submit_job(cluster, site, maximum_duration_minutes, checkpoint_minutes, is_b
|
|||||||
reservation_duration,\
|
reservation_duration,\
|
||||||
job_type=job_type,\
|
job_type=job_type,\
|
||||||
additional_options=f"--checkpoint {checkpoint}",\
|
additional_options=f"--checkpoint {checkpoint}",\
|
||||||
command=f"{path_to_script} {command}"), site)])[0]
|
command=f"{path}/{script} {path} {command}"), site)])[0]
|
||||||
return oar_job_id
|
return oar_job_id
|
||||||
|
|
||||||
def wait_for_completion(oar_job_id, site, sleep_time):
|
def wait_for_completion(oar_job_id, site, sleep_time):
|
||||||
state = "Running"
|
state = "Running"
|
||||||
while state != "Terminated" and state != "Error":
|
while state != "Terminated" and state != "Error":
|
||||||
time.sleep(sleeping_time)
|
time.sleep(sleep_time)
|
||||||
info = get_oar_job_info(oar_job_id, site)
|
info = get_oar_job_info(oar_job_id, site)
|
||||||
state = info["state"]
|
state = info["state"]
|
||||||
|
|
||||||
@ -30,13 +30,14 @@ def main():
|
|||||||
parser.add_argument("--max-duration", required=True, type=int, help="Max Duration in MINUTES of the docker build")
|
parser.add_argument("--max-duration", required=True, type=int, help="Max Duration in MINUTES of the docker build")
|
||||||
parser.add_argument("--checkpoint", required=True, type=int, help="Duration in MINUTES before the end of the job to do the checkpoint")
|
parser.add_argument("--checkpoint", required=True, type=int, help="Duration in MINUTES before the end of the job to do the checkpoint")
|
||||||
parser.add_argument("--besteffort", action='store_false', help="Submit the job as besteffort")
|
parser.add_argument("--besteffort", action='store_false', help="Submit the job as besteffort")
|
||||||
parser.add_argument("--path", required=True, help="Path to the bash script to oarsub")
|
parser.add_argument("--path", required=True, help="Root of the project")
|
||||||
parser.add_argument("--sleep_time", required=False, default=60, help="Time interval in seconds to check the termination of the job")
|
parser.add_argument("--script", required=True, help="Path of the bash script to oarsub relative to the '--path'")
|
||||||
|
parser.add_argument("--sleep_time", required=False, type=int, default=60, help="Time interval in seconds to check the termination of the job")
|
||||||
parser.add_argument("command", help="ECG Command")
|
parser.add_argument("command", help="ECG Command")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
oar_job_id = submit_job(args.cluster, args.site, args.max_duration, args.checkpoint_minutes, args.besteffort, args.path, args.command)
|
oar_job_id = submit_job(args.cluster, args.site, args.max_duration, args.checkpoint, args.besteffort, args.path, args.script, args.command)
|
||||||
|
|
||||||
wait_oar_job_start(oar_job_id, args.site)
|
wait_oar_job_start(oar_job_id, args.site)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user