Advanced Examples#

Advanced executor usage#

"""Advanced example illustrating the following concepts:

* Maximum number of workers
* Maximum number of tasks per worker
* Worker initialization
* Local log level applied remotely
* Remote logs showing locally
* Interchangeable executors

When using the SLURM executor you need three environment variables
and also specify

.. code-block:: bash

    export SLURM_URL="https://<domain>:<port>"
    export SLURM_RENEWAL_URL="ssh://<domain>:<port>"
    export SLURM_USER="${USER}"                          # optional
    export SLURM_TOKEN=$(scontrol token lifespan=3600)   # optional
    export SLURM_API_VERSION="v0.0.42"                   # optional

    python executor.py slurm --slurm-root-directory=/tmp_14_days

Comparison with thread of process based executors

.. code-block:: bash

    python executor.py thread
    python executor.py process
"""

import argparse
import getpass
import logging
import os
import socket
import sys
import time
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

from pyslurmutils.concurrent.futures import SlurmRestExecutor


def initializer():
    global GLOBAL_VAR
    GLOBAL_VAR = 0


def job():
    global GLOBAL_VAR
    GLOBAL_VAR += 1
    host = socket.gethostname()
    pid = os.getpid()
    ncpus = len(os.sched_getaffinity(pid))
    for i in range(5):
        print(f"print {i} (GLOBAL_VAR={GLOBAL_VAR})")
        logging.debug("log debug %d", i)
        logging.info("log info %d", i)
        logging.warning("log warning %d", i)
        logging.error("log error %d", i)
        time.sleep(0.1)
    return f"{host=}, {pid=}, {ncpus=}, {GLOBAL_VAR=}"


def executor_context(executor_type, slurm_root_directory, **kwargs):
    if executor_type == "slurm":
        url = os.environ.get("SLURM_URL")
        token = os.environ.get("SLURM_TOKEN")
        user_name = os.environ.get("SLURM_USER", getpass.getuser())

        if not slurm_root_directory:
            raise ValueError("--slurm-root-directory required")
        log_directory = os.path.join(slurm_root_directory, user_name, "slurm_logs")

        return SlurmRestExecutor(
            url=url,
            token=token,
            user_name=user_name,
            log_directory=log_directory,
            data_directory=None,
            std_split=False,
            **kwargs,
        )

    kwargs.pop("max_tasks_per_worker", None)

    if executor_type == "process":
        return ProcessPoolExecutor(**kwargs)
    if executor_type == "thread":
        return ThreadPoolExecutor(**kwargs)

    return ValueError(executor_type)


def main(executor_type, slurm_root_directory, log_level):
    logging.basicConfig(level=log_level, stream=sys.stdout)

    results = list()
    with executor_context(
        executor_type,
        slurm_root_directory,
        max_workers=2,
        max_tasks_per_worker=2,
        initializer=initializer,
    ) as executor:
        futures = [executor.submit(job) for _ in range(6)]
        for future in as_completed(futures):
            results.append(future.result())

    print()
    print("Results:")
    for result in results:
        print(result)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Run jobs using different executor types."
    )
    parser.add_argument(
        "executor_type",
        choices=["process", "thread", "slurm"],
        help="The type of executor to use (process, thread, slurm).",
    )
    parser.add_argument(
        "--slurm-root-directory",
        type=str,
        default=None,
        help="For logs files when using slurm.",
    )
    parser.add_argument(
        "--log-level",
        type=lambda s: getattr(logging, s.upper()),
        default=logging.INFO,
        help="For logs.",
    )

    args = parser.parse_args()
    main(args.executor_type, args.slurm_root_directory, args.log_level)

Pre-emptive job scheduling#

"""Advanced example illustrating pre-emptive job scheduling.

.. code-block:: bash

    export SLURM_TOKEN=$(scontrol token lifespan=3600)
    export SLURM_API_VERSION="v0.0.42"  # optional
    export SLURM_URL="https://<domain>:<port>"
    export SLURM_RENEWAL_URL="ssh://<domain>:<port>"
    export SLURM_USER=...

    python preemptive.py --slurm-root-directory=/tmp_14_days
"""

import argparse
import getpass
import logging
import os
import sys
import time

from pyslurmutils.concurrent.futures import SlurmRestExecutor

logger = logging.getLogger(__name__)


def main(slurm_root_directory, log_level):
    logging.basicConfig(level=log_level, stream=sys.stdout)

    url = os.environ.get("SLURM_URL")
    token = os.environ.get("SLURM_TOKEN")
    user_name = os.environ.get("SLURM_USER", getpass.getuser())

    log_directory = os.path.join(slurm_root_directory, user_name, "slurm_logs")

    with SlurmRestExecutor(
        url=url,
        token=token,
        user_name=user_name,
        log_directory=log_directory,
        lazy_scheduling=False,
        max_workers=2,
        max_tasks_per_worker=10,
        parameters={"time_limit": "00:01:00"},
    ) as executor:

        logger.info("execute a job ...")
        assert executor.submit(sum, [1, 1]).result() == 2

        logger.info("wait longer than the job time limit ...")
        time.sleep(120)

        logger.info("execute a job ...")
        assert executor.submit(sum, [1, 1]).result() == 2


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Jobs stay alive within the executor context."
    )
    parser.add_argument(
        "--slurm-root-directory",
        type=str,
        required=True,
        help="For logs files.",
    )
    parser.add_argument(
        "--log-level",
        type=lambda s: getattr(logging, s.upper()),
        default=logging.INFO,
        help="For logs.",
    )

    args = parser.parse_args()
    main(args.slurm_root_directory, args.log_level)