From 8ed548c435df97620d780eb404e5e4f4314f637f Mon Sep 17 00:00:00 2001 From: Quentin Guilloteau Date: Sat, 20 Jul 2024 15:41:56 +0200 Subject: [PATCH] first try for g5k execution --- config/config.yaml | 8 ++++ flake.lock | 55 +++++++++++++++++++++++++++ flake.nix | 7 +++- workflow/Snakefile | 10 ++++- workflow/scripts/ecg_wrapper.oar.bash | 17 +++++++++ workflow/scripts/submission_g5k.py | 47 +++++++++++++++++++++++ 6 files changed, 142 insertions(+), 2 deletions(-) create mode 100644 workflow/scripts/ecg_wrapper.oar.bash create mode 100644 workflow/scripts/submission_g5k.py diff --git a/config/config.yaml b/config/config.yaml index ddb2c1b..ad06758 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -2,3 +2,11 @@ folder_artifacts_nickel: "artifacts/nickel" folder_artifacts_json: "artifacts/json" folder_blacklists: "blacklists" symlink_blacklist: "blacklist.csv" +system: "g5k" # can be "local" for local execution + +site: "grenoble" +cluster: "dahu" +max_duration: 60 # 1 hour +checkpoint: 1 # 1 minute +besteffort: true +sleep_time: 300 # 5 minutes diff --git a/flake.lock b/flake.lock index b7deb4e..5045839 100644 --- a/flake.lock +++ b/flake.lock @@ -18,6 +18,45 @@ "type": "github" } }, + "flake-utils_2": { + "inputs": { + "systems": "systems_2" + }, + "locked": { + "lastModified": 1689068808, + "narHash": "sha256-6ixXo3wt24N/melDWjq70UuHQLxGV8jZvooRanIHXw0=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "919d646de7be200f3bf08cb76ae1f09402b6f9b4", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "kapack": { + "inputs": { + "flake-utils": "flake-utils_2", + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1718633852, + "narHash": "sha256-KVeKDdab2wMYMo60mEQHz8Dus4ddhxJ1HPCXzUt9ei8=", + "owner": "oar-team", + "repo": "nur-kapack", + "rev": "052fb35eb29228d9e4ea8afa09e9f0e390782cbd", + "type": "github" + }, + "original": { + "owner": "oar-team", + "repo": "nur-kapack", + "type": "github" + } + }, "nixpkgs": { "locked": { "lastModified": 1701282334, @@ -37,6 +76,7 @@ "root": { "inputs": { "flake-utils": "flake-utils", + "kapack": "kapack", "nixpkgs": "nixpkgs" } }, @@ -54,6 +94,21 @@ "repo": "default", "type": "github" } + }, + "systems_2": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } } }, "root": "root", diff --git a/flake.nix b/flake.nix index 1b33df5..e0a5c1f 100644 --- a/flake.nix +++ b/flake.nix @@ -4,12 +4,15 @@ inputs = { nixpkgs.url = "github:nixos/nixpkgs/23.11"; flake-utils.url = "github:numtide/flake-utils"; + kapack.url = "github:oar-team/nur-kapack"; + kapack.inputs.nixpkgs.follows = "nixpkgs"; }; - outputs = { self, nixpkgs, flake-utils }: + outputs = { self, nixpkgs, flake-utils, kapack }: flake-utils.lib.eachDefaultSystem (system: let pkgs = import nixpkgs { inherit system; }; + kapkgs = kapack.packages.${system}; in { devShells = { @@ -18,8 +21,10 @@ snakemake gawk nickel + # TODO separate into several shells (python3.withPackages (ps: with ps; [ requests + kapkgs.execo ])) ]; }; diff --git a/workflow/Snakefile b/workflow/Snakefile index e0772aa..3697878 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -10,6 +10,7 @@ ARTIFACTS_FOLDER_JSON = config["folder_artifacts_json"] BLACKLIST_FOLDER = config["folder_blacklists"] BLACKLIST = config["symlink_blacklist"] EXTENSION = "json" +SYSTEM = config["system"] ARTIFACTS = get_artifacts_to_build(ARTIFACTS_FOLDER_NICKEL, BLACKLIST) @@ -45,11 +46,18 @@ rule check_artifact: nickel export --format json --output {output} <<< 'let {{Artifact, ..}} = import "{input.contract}" in ((import "{input.artifact}") | Artifact)' """ +SHELLS_ECG = { + "local": f"python3 {{input.ecg}} -l {{output.log}} -p {{output.pkg}} -b {{output.build_status}} -a {{output.artifact_hash}} {ARTIFACTS_FOLDER_JSON}/{{wildcards.artifact}}.{EXTENSION}", + "g5k": f"python3 {{input.execo_wrapper}} --site {config['site']} --cluster {config['cluster']} --max_duration {config['max_duration']} --checkpoint {config['checkpoint']} {'--besteffort' if config['besteffort'] else ''} --sleep_time {config['sleep_time']} -- " +} + rule run_ecg: input: "flake.nix", "flake.lock", ecg="ecg.py", + execo_wrapper="workflow/scripts/submission_g5k.py", + oar_wrapper="workflow/scripts/ecg_wrapper.oar.bash", artifact=f"{ARTIFACTS_FOLDER_JSON}/{{artifact}}.{EXTENSION}" output: log = "logs/{artifact}/{date}.txt", @@ -57,7 +65,7 @@ rule run_ecg: build_status = "build_status/{artifact}/{date}.csv", artifact_hash = "artifact_hash/{artifact}/{date}.csv" shell: - f"python3 {{input.ecg}} -l {{output.log}} -p {{output.pkg}} -b {{output.build_status}} -a {{output.artifact_hash}} {ARTIFACTS_FOLDER_JSON}/{{wildcards.artifact}}.{EXTENSION}" + (SHELLS_ECG["g5k"] if SYSTEM == "g5k" else "") + SHELLS_ECG["local"] rule update_blacklist: input: diff --git a/workflow/scripts/ecg_wrapper.oar.bash b/workflow/scripts/ecg_wrapper.oar.bash new file mode 100644 index 0000000..98dc257 --- /dev/null +++ b/workflow/scripts/ecg_wrapper.oar.bash @@ -0,0 +1,17 @@ +#!/bin/bash + +set -xe + +# To "activate" nix on the node +export PATH=~/.local/bin:$PATH + +# Install Docker on the node (-t is to store the images on /tmp because it has more disk) +# https://www.grid5000.fr/w/Docker +g5k-setup-docker -t + +handler() { + echo "Caught checkpoint signal at: `date`"; echo "Terminating."; exit 0; +} +trap handler SIGUSR2 + +nix develop --command $@ diff --git a/workflow/scripts/submission_g5k.py b/workflow/scripts/submission_g5k.py new file mode 100644 index 0000000..8c5e42f --- /dev/null +++ b/workflow/scripts/submission_g5k.py @@ -0,0 +1,47 @@ +from execo_g5k import oardel, oarsub, OarSubmission, wait_oar_job_start, get_oar_job_nodes, get_oar_job_info +import time +import argparse + +def submit_job(cluster, site, maximum_duration_minutes, checkpoint_minutes, is_besteffort, path_to_script, command): + reservation_duration = (maximum_duration_minutes + checkpoint_minutes) * 60 + checkpoint = checkpoint_minutes * 60 + job_type = [] + if is_besteffort: + job_type.append("besteffort") + + oar_job_id, _site = oarsub([(OarSubmission(f"{{cluster='{cluster}'}}/nodes=1",\ + reservation_duration,\ + job_type=job_type,\ + additional_options=f"--checkpoint {checkpoint}",\ + command=f"{path_to_script} {command}"), site)])[0] + return oar_job_id + +def wait_for_completion(oar_job_id, site, sleep_time): + state = "Running" + while state != "Terminated" and state != "Error": + time.sleep(sleeping_time) + info = get_oar_job_info(oar_job_id, site) + state = info["state"] + +def main(): + parser = argparse.ArgumentParser(description="Wrapper script to submit to OAR from a namespace") + parser.add_argument("--site", required=True, help="Grid'5000 site to submit to") + parser.add_argument("--cluster", required=True, help="Cluster to submit to") + parser.add_argument("--max-duration", required=True, type=int, help="Max Duration in MINUTES of the docker build") + parser.add_argument("--checkpoint", required=True, type=int, help="Duration in MINUTES before the end of the job to do the checkpoint") + parser.add_argument("--besteffort", action='store_false', help="Submit the job as besteffort") + parser.add_argument("--path", required=True, help="Path to the bash script to oarsub") + parser.add_argument("--sleep_time", required=False, default=60, help="Time interval in seconds to check the termination of the job") + parser.add_argument("command", help="ECG Command") + + args = parser.parse_args() + + oar_job_id = submit_job(args.cluster, args.site, args.max_duration, args.checkpoint_minutes, args.besteffort, args.path, args.command) + + wait_oar_job_start(oar_job_id, args.site) + + wait_for_completion(oar_job_id, args.site, args.sleep_time) + + return 0 + +main()