first try for g5k execution

This commit is contained in:
Quentin Guilloteau 2024-07-20 15:41:56 +02:00
parent d7749369b4
commit 8ed548c435
6 changed files with 142 additions and 2 deletions

View File

@ -2,3 +2,11 @@ folder_artifacts_nickel: "artifacts/nickel"
folder_artifacts_json: "artifacts/json"
folder_blacklists: "blacklists"
symlink_blacklist: "blacklist.csv"
system: "g5k" # can be "local" for local execution
site: "grenoble"
cluster: "dahu"
max_duration: 60 # 1 hour
checkpoint: 1 # 1 minute
besteffort: true
sleep_time: 300 # 5 minutes

55
flake.lock generated
View File

@ -18,6 +18,45 @@
"type": "github"
}
},
"flake-utils_2": {
"inputs": {
"systems": "systems_2"
},
"locked": {
"lastModified": 1689068808,
"narHash": "sha256-6ixXo3wt24N/melDWjq70UuHQLxGV8jZvooRanIHXw0=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "919d646de7be200f3bf08cb76ae1f09402b6f9b4",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"kapack": {
"inputs": {
"flake-utils": "flake-utils_2",
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1718633852,
"narHash": "sha256-KVeKDdab2wMYMo60mEQHz8Dus4ddhxJ1HPCXzUt9ei8=",
"owner": "oar-team",
"repo": "nur-kapack",
"rev": "052fb35eb29228d9e4ea8afa09e9f0e390782cbd",
"type": "github"
},
"original": {
"owner": "oar-team",
"repo": "nur-kapack",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1701282334,
@ -37,6 +76,7 @@
"root": {
"inputs": {
"flake-utils": "flake-utils",
"kapack": "kapack",
"nixpkgs": "nixpkgs"
}
},
@ -54,6 +94,21 @@
"repo": "default",
"type": "github"
}
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",

View File

@ -4,12 +4,15 @@
inputs = {
nixpkgs.url = "github:nixos/nixpkgs/23.11";
flake-utils.url = "github:numtide/flake-utils";
kapack.url = "github:oar-team/nur-kapack";
kapack.inputs.nixpkgs.follows = "nixpkgs";
};
outputs = { self, nixpkgs, flake-utils }:
outputs = { self, nixpkgs, flake-utils, kapack }:
flake-utils.lib.eachDefaultSystem (system:
let
pkgs = import nixpkgs { inherit system; };
kapkgs = kapack.packages.${system};
in
{
devShells = {
@ -18,8 +21,10 @@
snakemake
gawk
nickel
# TODO separate into several shells
(python3.withPackages (ps: with ps; [
requests
kapkgs.execo
]))
];
};

View File

@ -10,6 +10,7 @@ ARTIFACTS_FOLDER_JSON = config["folder_artifacts_json"]
BLACKLIST_FOLDER = config["folder_blacklists"]
BLACKLIST = config["symlink_blacklist"]
EXTENSION = "json"
SYSTEM = config["system"]
ARTIFACTS = get_artifacts_to_build(ARTIFACTS_FOLDER_NICKEL, BLACKLIST)
@ -45,11 +46,18 @@ rule check_artifact:
nickel export --format json --output {output} <<< 'let {{Artifact, ..}} = import "{input.contract}" in ((import "{input.artifact}") | Artifact)'
"""
SHELLS_ECG = {
"local": f"python3 {{input.ecg}} -l {{output.log}} -p {{output.pkg}} -b {{output.build_status}} -a {{output.artifact_hash}} {ARTIFACTS_FOLDER_JSON}/{{wildcards.artifact}}.{EXTENSION}",
"g5k": f"python3 {{input.execo_wrapper}} --site {config['site']} --cluster {config['cluster']} --max_duration {config['max_duration']} --checkpoint {config['checkpoint']} {'--besteffort' if config['besteffort'] else ''} --sleep_time {config['sleep_time']} -- "
}
rule run_ecg:
input:
"flake.nix",
"flake.lock",
ecg="ecg.py",
execo_wrapper="workflow/scripts/submission_g5k.py",
oar_wrapper="workflow/scripts/ecg_wrapper.oar.bash",
artifact=f"{ARTIFACTS_FOLDER_JSON}/{{artifact}}.{EXTENSION}"
output:
log = "logs/{artifact}/{date}.txt",
@ -57,7 +65,7 @@ rule run_ecg:
build_status = "build_status/{artifact}/{date}.csv",
artifact_hash = "artifact_hash/{artifact}/{date}.csv"
shell:
f"python3 {{input.ecg}} -l {{output.log}} -p {{output.pkg}} -b {{output.build_status}} -a {{output.artifact_hash}} {ARTIFACTS_FOLDER_JSON}/{{wildcards.artifact}}.{EXTENSION}"
(SHELLS_ECG["g5k"] if SYSTEM == "g5k" else "") + SHELLS_ECG["local"]
rule update_blacklist:
input:

View File

@ -0,0 +1,17 @@
#!/bin/bash
set -xe
# To "activate" nix on the node
export PATH=~/.local/bin:$PATH
# Install Docker on the node (-t is to store the images on /tmp because it has more disk)
# https://www.grid5000.fr/w/Docker
g5k-setup-docker -t
handler() {
echo "Caught checkpoint signal at: `date`"; echo "Terminating."; exit 0;
}
trap handler SIGUSR2
nix develop --command $@

View File

@ -0,0 +1,47 @@
from execo_g5k import oardel, oarsub, OarSubmission, wait_oar_job_start, get_oar_job_nodes, get_oar_job_info
import time
import argparse
def submit_job(cluster, site, maximum_duration_minutes, checkpoint_minutes, is_besteffort, path_to_script, command):
reservation_duration = (maximum_duration_minutes + checkpoint_minutes) * 60
checkpoint = checkpoint_minutes * 60
job_type = []
if is_besteffort:
job_type.append("besteffort")
oar_job_id, _site = oarsub([(OarSubmission(f"{{cluster='{cluster}'}}/nodes=1",\
reservation_duration,\
job_type=job_type,\
additional_options=f"--checkpoint {checkpoint}",\
command=f"{path_to_script} {command}"), site)])[0]
return oar_job_id
def wait_for_completion(oar_job_id, site, sleep_time):
state = "Running"
while state != "Terminated" and state != "Error":
time.sleep(sleeping_time)
info = get_oar_job_info(oar_job_id, site)
state = info["state"]
def main():
parser = argparse.ArgumentParser(description="Wrapper script to submit to OAR from a namespace")
parser.add_argument("--site", required=True, help="Grid'5000 site to submit to")
parser.add_argument("--cluster", required=True, help="Cluster to submit to")
parser.add_argument("--max-duration", required=True, type=int, help="Max Duration in MINUTES of the docker build")
parser.add_argument("--checkpoint", required=True, type=int, help="Duration in MINUTES before the end of the job to do the checkpoint")
parser.add_argument("--besteffort", action='store_false', help="Submit the job as besteffort")
parser.add_argument("--path", required=True, help="Path to the bash script to oarsub")
parser.add_argument("--sleep_time", required=False, default=60, help="Time interval in seconds to check the termination of the job")
parser.add_argument("command", help="ECG Command")
args = parser.parse_args()
oar_job_id = submit_job(args.cluster, args.site, args.max_duration, args.checkpoint_minutes, args.besteffort, args.path, args.command)
wait_oar_job_start(oar_job_id, args.site)
wait_for_completion(oar_job_id, args.site, args.sleep_time)
return 0
main()