"""Advanced example illustrating the following concepts:
* Maximum number of workers
* Maximum number of tasks per worker
* Worker initialization
* Local log level applied remotely
* Remote logs showing locally
* Interchangeable executors
When using the SLURM executor you need three environment variables
and also specify
.. code-block:: bash
export SLURM_URL="https://<domain>:<port>"
export SLURM_RENEWAL_URL="ssh://<domain>:<port>"
export SLURM_USER="${USER}" # optional
export SLURM_TOKEN=$(scontrol token lifespan=3600) # optional
export SLURM_API_VERSION="v0.0.42" # optional
python executor.py slurm --slurm-root-directory=/tmp_14_days
Comparison with thread of process based executors
.. code-block:: bash
python executor.py thread
python executor.py process
"""
import argparse
import getpass
import logging
import os
import socket
import sys
import time
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from pyslurmutils.concurrent.futures import SlurmRestExecutor
def initializer():
global GLOBAL_VAR
GLOBAL_VAR = 0
def job():
global GLOBAL_VAR
GLOBAL_VAR += 1
host = socket.gethostname()
pid = os.getpid()
ncpus = len(os.sched_getaffinity(pid))
for i in range(5):
print(f"print {i} (GLOBAL_VAR={GLOBAL_VAR})")
logging.debug("log debug %d", i)
logging.info("log info %d", i)
logging.warning("log warning %d", i)
logging.error("log error %d", i)
time.sleep(0.1)
return f"{host=}, {pid=}, {ncpus=}, {GLOBAL_VAR=}"
def executor_context(executor_type, slurm_root_directory, **kwargs):
if executor_type == "slurm":
url = os.environ.get("SLURM_URL")
token = os.environ.get("SLURM_TOKEN")
user_name = os.environ.get("SLURM_USER", getpass.getuser())
if not slurm_root_directory:
raise ValueError("--slurm-root-directory required")
log_directory = os.path.join(slurm_root_directory, user_name, "slurm_logs")
return SlurmRestExecutor(
url=url,
token=token,
user_name=user_name,
log_directory=log_directory,
data_directory=None,
std_split=False,
**kwargs,
)
kwargs.pop("max_tasks_per_worker", None)
if executor_type == "process":
return ProcessPoolExecutor(**kwargs)
if executor_type == "thread":
return ThreadPoolExecutor(**kwargs)
return ValueError(executor_type)
def main(executor_type, slurm_root_directory, log_level):
logging.basicConfig(level=log_level, stream=sys.stdout)
results = list()
with executor_context(
executor_type,
slurm_root_directory,
max_workers=2,
max_tasks_per_worker=2,
initializer=initializer,
) as executor:
futures = [executor.submit(job) for _ in range(6)]
for future in as_completed(futures):
results.append(future.result())
print()
print("Results:")
for result in results:
print(result)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run jobs using different executor types."
)
parser.add_argument(
"executor_type",
choices=["process", "thread", "slurm"],
help="The type of executor to use (process, thread, slurm).",
)
parser.add_argument(
"--slurm-root-directory",
type=str,
default=None,
help="For logs files when using slurm.",
)
parser.add_argument(
"--log-level",
type=lambda s: getattr(logging, s.upper()),
default=logging.INFO,
help="For logs.",
)
args = parser.parse_args()
main(args.executor_type, args.slurm_root_directory, args.log_level)