Parallélisation d’une fonction python sur un cluster utilisant SLURM
Ci-dessous, nous proposons une méthode de parallélisation d’une fonction python sur un cluster utilisant SLURM/
Les conditions préalables sont de :
– créer un environnement conda sur le cluster, conda_env, compatible avec un environnement en local, en particulier au niveau de la version des pickles.
– créer un répertoire /home/{user_name}/modules sur le cluster dans lequel les modules seront copiés.
La fonction suivante do_on_cluster est le décorateur qui permet cette parallélisation.
def do_on_cluster(
        Lx0=None,
        x0=None,
        Dnet=None,
        with_GNU_parallel=True,
        ref=None,
        level="INFO", # the logging level on the cluster
        nLmin_by_job=200, # Number min of cases to treat by job
        ncore_max=80, # number max of core to use
        njob_max=1000,
        dir_Lres="dir_Lres",
        with_return_Lres=True,
        with_fuse_LL=True, # to fuse the differents results of the parallelization
        Lmodule_to_update=[],
        return_Lres=False,
    ):
    """
    ref: https://www.artima.com/weblogs/viewpost.jsp?thread=240845#decorator-functions-with-decorator-arguments
    
    """
    # -----------------------------------------------
    logger.info("do_on_cluster")
    if Dnet is not None:
        user_cluster = Dnet["user_cluster"]
        IP_cluster = Dnet["IP_cluster"]
        conda_env = Dnet["conda_env"]
        cmd_smina = Dnet["cmd_smina"]
        queue = Dnet["queue"]
    if ref is None:
        ref = f0.__name__
    def inner(f0):
        logger.info("\t inside decorator")
        def wrapper(*args, **kwargs) :
            # --------------------------------------------------------------------
            logger.info(f"\t update of the modules")
            update_module_on_cluster(
                Lmodule_to_update,
                Dnet=Dnet,
                )
            # --------------------------------------------------------------------------
            path_cluster = f"/home/{user_cluster}"
            path_local = get_path()
            logger.info(f"* path_local={path_local}")
            # to be sure that the same job is not still running
            # --------------------------------------------------------------------------
            cmd = f"scancel --user {user_cluster}--name {ref}"
            bash_on_remote(
                                cmd,
                                Dnet,
                            )
            # --------------------------------------------------------------------------
            logger.info(f"Remove of directory {ref} on cluster ...")
            Lcmd = [f"rm -rf {path_cluster}/{ref}"]
            bash_on_remote(
                                Lcmd, 
                                Dnet, 
                            )
            # --------------------------------------------------------------------------
            logger.info(f"Remove of directory of results in local")
            remove_dir(dir_Lres)
            # ========================================================================
            # Creation of a new directory to gather all the informations
            # usefull to launch the script on the cluster.
            # --------------------------------------------------------------------------
            logger.info(f"Creation of the directory to send")
            make_dir(ref)
            os.chdir(ref)
            dump_var(args, "args")
            dump_var(kwargs, "kwargs")
            make_dir(dir_Lres)
            path_local_ref = get_path()
            # --------------------------------------------------------------------------
            LL = split_L_in_LL(
                                Lx0,
                                nL=nLmin_by_job,
                                nLL_max=njob_max,
                                )
        
            if len(LL) < ncore_max:
                n_core = len(LL)
            else:
                n_core = ncore_max
            Ls = dump_Lvar(LL, "L")
            create_txt_from_L(Ls, "Ls.txt")
            nLs = len(Ls)
            create_txt_from_s(str(nLs), "nLs.txt")
            # --------------------------------------------------------------------------
            # Dump of informations that can't be pass directly
            # --------------------------------------------------------------------------
                
            # ========================================================================
            # Creation of job_all.sh to launch of the different jobs
            # can be sbatch or bash
            # --------------------------------------------------------------------------
            Ls = f"""
#!/bin/bash
sbatch --wait job1.sh
wait
    """.split("\n")
            create_txt_from_L(Ls[1:], "job_all.sh")
            # ========================================================================
            # Creation of job1.sh slurm file
            # --------------------------------------------------------------------------
            create_slurm_job(
                                "do1.py",
                                with_GNU_parallel=with_GNU_parallel,
                                ref=ref,
                                Dnet=Dnet,
                                fname="job1.sh",
                                n_core=n_core,
                                cmd_to_insert_in_0=f"cd /home/{user_cluster}/{ref}",
                                s_parallel="parallel1",
                                )
            # --------------------------------------------------------------------------
            # do1.py file
            # --------------------------------------------------------------------------
            # --------------------------------------------------------------------------
            module = f0.__module__
            sf0 = f0.__name__
            Ls_py = f"""
import sys
import os
import traceback
sys.path.insert(0, "/home/{user_cluster}/modules")
# =============================================================================
from tools_SBC.logging_SBC import  (set_log, logger)
from tools_SBC.basic import (load_var, dump_var, get_line_n_in_file, move_file, touch,
                remove_file, remove_Lfile)
# =============================================================================
from {module} import {sf0}
# ------------------------------------------------------------------------------
i_line = int(sys.argv[1])
file = get_line_n_in_file("Ls.txt", i_line)
Lx = load_var(file)
args = load_var("args")
kwargs = load_var("kwargs")
Lres =[]
for x in Lx:
    kwargs["{x0}"]=x
    res = {sf0}(**kwargs)
    Lres.append(res)
if {return_Lres}:
    dump_var(Lres, f"{dir_Lres}/{{file}}")
remove_file(file)
""".split("\n")[1:]
            # -----------------------------------------------------------------------
            create_txt_from_L(Ls_py, "do1.py")
            # ========================================================================
            # Export of the directory on the cluster and launch of the job
            # -------------------------------------------------------------------------
            os.chdir("..")
            logger.info(f"Transfert  of {ref} on cluster ...")
            bash(
                            f"scp -C -r {ref} {user_cluster}@{IP_cluster}:{path_cluster}", 
                            info=True,
                )
            # --------------------------------------------------------------------------
            # Launch of the jobs
            # -------------------------------------------------------------------------
            logger.info(f"Launch of jobs ...")
            Lcmd = [f"cd {path_cluster}/{ref}",
                    "bash job_all.sh"]
            bash_on_remote(
                            Lcmd, 
                            Dnet, 
                        )
            logger.info(f"on going ...")
            logger.debug(f"wait for results")
            # --------------------------------------------------------------------------
            # Loading of results
            # -------------------------------------------------------------------------
            if return_Lres:
                logger.info(f"loading of the results")
                os.chdir(path_local_ref)
                remove_dir(dir_Lres)
                bash(
                                f"scp -C -r {user_cluster}@{IP_cluster}:{path_cluster}/{ref}/{dir_Lres} {path_local_ref}", 
                                info=True,
                            )
                os.chdir(dir_Lres)
                LLres = load_Lvar("L*")
                os.chdir(path_local)
                return fuse_LL(LLres)
            os.chdir(path_local)
        return wrapper
    return inner
Vous disposez maintenant du décorateur do_on_cluster permettant de paralléliser une fonction f0 sur un cluster
Exemple 1: Minimisation d’une liste de molécules
    do_on_cluster(
            # --------------
            Lx0=Lmol,
            x0="mol",
            # --------------
            Dnet=Dnet,
            with_GNU_parallel=True,
            # --------------
            ref="Lscav2",
            level=level,
            nLmin_by_job=nLmin_by_job,
            ncore_max=ncore_max, 
            njob_max=njob_max,
            Lmodule_to_update=Lmodule_to_update,
            return_Lres=True,
            )(get_mol_minimized_with_smina)( )
