Parallélisation d’une fonction python sur un cluster utilisant SLURM
Ci-dessous, nous proposons une méthode de parallélisation d’une fonction python sur un cluster utilisant SLURM/
Les conditions préalables sont de :
– créer un environnement conda sur le cluster, conda_env, compatible avec un environnement en local, en particulier au niveau de la version des pickles.
– créer un répertoire /home/{user_name}/modules sur le cluster dans lequel les modules seront copiés.
La fonction suivante do_on_cluster est le décorateur qui permet cette parallélisation.
def do_on_cluster(
Lx0=None,
x0=None,
Dnet=None,
with_GNU_parallel=True,
ref=None,
level="INFO", # the logging level on the cluster
nLmin_by_job=200, # Number min of cases to treat by job
ncore_max=80, # number max of core to use
njob_max=1000,
dir_Lres="dir_Lres",
with_return_Lres=True,
with_fuse_LL=True, # to fuse the differents results of the parallelization
Lmodule_to_update=[],
return_Lres=False,
):
"""
ref: https://www.artima.com/weblogs/viewpost.jsp?thread=240845#decorator-functions-with-decorator-arguments
"""
# -----------------------------------------------
logger.info("do_on_cluster")
if Dnet is not None:
user_cluster = Dnet["user_cluster"]
IP_cluster = Dnet["IP_cluster"]
conda_env = Dnet["conda_env"]
cmd_smina = Dnet["cmd_smina"]
queue = Dnet["queue"]
if ref is None:
ref = f0.__name__
def inner(f0):
logger.info("\t inside decorator")
def wrapper(*args, **kwargs) :
# --------------------------------------------------------------------
logger.info(f"\t update of the modules")
update_module_on_cluster(
Lmodule_to_update,
Dnet=Dnet,
)
# --------------------------------------------------------------------------
path_cluster = f"/home/{user_cluster}"
path_local = get_path()
logger.info(f"* path_local={path_local}")
# to be sure that the same job is not still running
# --------------------------------------------------------------------------
cmd = f"scancel --user {user_cluster}--name {ref}"
bash_on_remote(
cmd,
Dnet,
)
# --------------------------------------------------------------------------
logger.info(f"Remove of directory {ref} on cluster ...")
Lcmd = [f"rm -rf {path_cluster}/{ref}"]
bash_on_remote(
Lcmd,
Dnet,
)
# --------------------------------------------------------------------------
logger.info(f"Remove of directory of results in local")
remove_dir(dir_Lres)
# ========================================================================
# Creation of a new directory to gather all the informations
# usefull to launch the script on the cluster.
# --------------------------------------------------------------------------
logger.info(f"Creation of the directory to send")
make_dir(ref)
os.chdir(ref)
dump_var(args, "args")
dump_var(kwargs, "kwargs")
make_dir(dir_Lres)
path_local_ref = get_path()
# --------------------------------------------------------------------------
LL = split_L_in_LL(
Lx0,
nL=nLmin_by_job,
nLL_max=njob_max,
)
if len(LL) < ncore_max:
n_core = len(LL)
else:
n_core = ncore_max
Ls = dump_Lvar(LL, "L")
create_txt_from_L(Ls, "Ls.txt")
nLs = len(Ls)
create_txt_from_s(str(nLs), "nLs.txt")
# --------------------------------------------------------------------------
# Dump of informations that can't be pass directly
# --------------------------------------------------------------------------
# ========================================================================
# Creation of job_all.sh to launch of the different jobs
# can be sbatch or bash
# --------------------------------------------------------------------------
Ls = f"""
#!/bin/bash
sbatch --wait job1.sh
wait
""".split("\n")
create_txt_from_L(Ls[1:], "job_all.sh")
# ========================================================================
# Creation of job1.sh slurm file
# --------------------------------------------------------------------------
create_slurm_job(
"do1.py",
with_GNU_parallel=with_GNU_parallel,
ref=ref,
Dnet=Dnet,
fname="job1.sh",
n_core=n_core,
cmd_to_insert_in_0=f"cd /home/{user_cluster}/{ref}",
s_parallel="parallel1",
)
# --------------------------------------------------------------------------
# do1.py file
# --------------------------------------------------------------------------
# --------------------------------------------------------------------------
module = f0.__module__
sf0 = f0.__name__
Ls_py = f"""
import sys
import os
import traceback
sys.path.insert(0, "/home/{user_cluster}/modules")
# =============================================================================
from tools_SBC.logging_SBC import (set_log, logger)
from tools_SBC.basic import (load_var, dump_var, get_line_n_in_file, move_file, touch,
remove_file, remove_Lfile)
# =============================================================================
from {module} import {sf0}
# ------------------------------------------------------------------------------
i_line = int(sys.argv[1])
file = get_line_n_in_file("Ls.txt", i_line)
Lx = load_var(file)
args = load_var("args")
kwargs = load_var("kwargs")
Lres =[]
for x in Lx:
kwargs["{x0}"]=x
res = {sf0}(**kwargs)
Lres.append(res)
if {return_Lres}:
dump_var(Lres, f"{dir_Lres}/{{file}}")
remove_file(file)
""".split("\n")[1:]
# -----------------------------------------------------------------------
create_txt_from_L(Ls_py, "do1.py")
# ========================================================================
# Export of the directory on the cluster and launch of the job
# -------------------------------------------------------------------------
os.chdir("..")
logger.info(f"Transfert of {ref} on cluster ...")
bash(
f"scp -C -r {ref} {user_cluster}@{IP_cluster}:{path_cluster}",
info=True,
)
# --------------------------------------------------------------------------
# Launch of the jobs
# -------------------------------------------------------------------------
logger.info(f"Launch of jobs ...")
Lcmd = [f"cd {path_cluster}/{ref}",
"bash job_all.sh"]
bash_on_remote(
Lcmd,
Dnet,
)
logger.info(f"on going ...")
logger.debug(f"wait for results")
# --------------------------------------------------------------------------
# Loading of results
# -------------------------------------------------------------------------
if return_Lres:
logger.info(f"loading of the results")
os.chdir(path_local_ref)
remove_dir(dir_Lres)
bash(
f"scp -C -r {user_cluster}@{IP_cluster}:{path_cluster}/{ref}/{dir_Lres} {path_local_ref}",
info=True,
)
os.chdir(dir_Lres)
LLres = load_Lvar("L*")
os.chdir(path_local)
return fuse_LL(LLres)
os.chdir(path_local)
return wrapper
return inner
Vous disposez maintenant du décorateur do_on_cluster permettant de paralléliser une fonction f0 sur un cluster
Exemple 1: Minimisation d’une liste de molécules
do_on_cluster(
# --------------
Lx0=Lmol,
x0="mol",
# --------------
Dnet=Dnet,
with_GNU_parallel=True,
# --------------
ref="Lscav2",
level=level,
nLmin_by_job=nLmin_by_job,
ncore_max=ncore_max,
njob_max=njob_max,
Lmodule_to_update=Lmodule_to_update,
return_Lres=True,
)(get_mol_minimized_with_smina)( )