Source code for pyslurmutils.client.log_utils
import logging
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from typing import Generator
from typing import List
from typing import Optional
[docs]
def monitor_log_file(
file_path: str, stop_event: threading.Event, period: float = 0.5
) -> None:
"""Redirect a log file with the following structure to the local root logger
.. code
[SLURM15733578] [INFO] [root] Your log message here
[SLURM15733578] [ERROR] [package.module] Your log message here
...
"""
last_log_level = logging.INFO
current_record = list()
dir_name = os.path.dirname(file_path)
while not os.path.exists(file_path) and not stop_event.is_set():
time.sleep(period)
_ = os.listdir(dir_name)
if stop_event.is_set():
return
with open(file_path, "r") as log_file:
while not stop_event.is_set():
line = log_file.readline()
if line:
try:
parts = line.split(" ", 3)
job_id = parts[0].strip("[]")
level = parts[1].strip("[]")
name = parts[2].strip("[]")
message = parts[3].rstrip()
log_level = getattr(logging, level)
if not isinstance(log_level, int):
raise TypeError
if current_record:
logging.log(last_log_level, "\n".join(current_record))
current_record.clear()
current_record.append(f"[{job_id}] [{name}] {message}")
last_log_level = log_level
except (IndexError, AttributeError, TypeError):
current_record.append(line.rstrip())
else:
time.sleep(period)
[docs]
@contextmanager
def log_file_monitor_context(
file_paths: List[Optional[str]],
) -> Generator[None, None, None]:
"""Redirect logs files to the local root logger within this context."""
file_paths = {
file_path for file_path in file_paths if file_path and file_path != "/dev/null"
}
if not file_paths:
yield
return
stop_event = threading.Event()
with ThreadPoolExecutor(max_workers=len(file_paths)) as executor:
futures = [
executor.submit(monitor_log_file, file_path, stop_event)
for file_path in file_paths
]
try:
yield
finally:
stop_event.set()
for future, file_path in zip(futures, file_paths):
try:
future.result()
except Exception:
logging.warning("monitoring %s failed", file_path, exc_info=True)