# coding: utf-8
"""
Collection of low-level tools to facilitate the interface with resource managers.
The preferred way of importing this module is:
import qutils as qu
"""
from __future__ import annotations
import os
from subprocess import Popen, PIPE, run
from monty.string import is_string
from pymatgen.core.units import Time, Memory, UnitError
from abipy.tools.typing import PathLike
from abipy.tools import duck
from abipy.tools.text import rm_multiple_spaces
[docs]
def slurm_parse_timestr(s: str) -> Time:
"""
A slurm time parser. Accepts a string in one the following forms:
# "days-hours",
# "days-hours:minutes",
# "days-hours:minutes:seconds".
# "minutes",
# "minutes:seconds",
# "hours:minutes:seconds",
Returns: Time in seconds.
Raises:
`ValueError` if string is not valid.
"""
days, hours, minutes, seconds = 0, 0, 0, 0
if duck.is_number_like(s):
return Time(s, "s")
if '-' in s:
# "days-hours",
# "days-hours:minutes",
# "days-hours:minutes:seconds".
days, s = s.split("-")
days = int(days)
if ':' not in s:
hours = int(float(s))
elif s.count(':') == 1:
hours, minutes = map(int, s.split(':'))
elif s.count(':') == 2:
hours, minutes, seconds = map(int, s.split(':'))
else:
raise ValueError("More that 2 ':' in string!")
else:
# "minutes",
# "minutes:seconds",
# "hours:minutes:seconds",
if ':' not in s:
minutes = int(float(s))
elif s.count(':') == 1:
minutes, seconds = map(int, s.split(':'))
elif s.count(':') == 2:
hours, minutes, seconds = map(int, s.split(':'))
else:
raise ValueError("More than 2 ':' in string!")
return Time((days*24 + hours)*3600 + minutes*60 + seconds, "s")
[docs]
def time2slurm(timeval: float, unit="s") -> str:
"""
Convert a number representing a time value in the given unit (Default: seconds)
to a string following the slurm convention: "days-hours:minutes:seconds".
>>> assert time2slurm(61) == '0-0:1:1' and time2slurm(60*60+1) == '0-1:0:1'
>>> assert time2slurm(0.5, unit="h") == '0-0:30:0'
"""
d, h, m, s = 24*3600, 3600, 60, 1
timeval = Time(timeval, unit).to("s")
days, hours = divmod(timeval, d)
hours, minutes = divmod(hours, h)
minutes, secs = divmod(minutes, m)
return "%d-%d:%d:%d" % (days, hours, minutes, secs)
[docs]
def time2pbspro(timeval: float, unit="s") -> str:
"""
Convert a number representing a time value in the given unit (Default: seconds)
to a string following the PbsPro convention: "hours:minutes:seconds".
>>> assert time2pbspro(2, unit="d") == '48:0:0'
"""
h, m, s = 3600, 60, 1
timeval = Time(timeval, unit).to("s")
hours, minutes = divmod(timeval, h)
minutes, secs = divmod(minutes, m)
return "%d:%d:%d" % (hours, minutes, secs)
[docs]
def time2loadlever(timeval: float, unit="s") -> str:
"""
Convert a number representing a time value in the given unit (Default: seconds)
to a string following the LoadLever convention. format hh:mm:ss (hours:minutes:seconds)
>>> assert time2loadlever(2, unit="d") == '48:00:00'
"""
h, m, s = 3600, 60, 1
timeval = Time(timeval, unit).to("s")
hours, minutes = divmod(timeval, h)
minutes, secs = divmod(minutes, m)
return "%d:%02d:%02d" % (hours, minutes, secs)
[docs]
def timelimit_parser(s):
"""Convert a float or a string into time in seconds."""
try:
return Time(float(s), "s")
except ValueError:
return slurm_parse_timestr(s)
[docs]
def any2mb(s):
"""Convert string or number to memory in megabytes."""
if is_string(s):
try:
# latest pymatgen version (as of july 2024)
mem = int(Memory.from_str(s.upper()).to("MB"))
except (KeyError, UnitError): # For backward compatibility with older pymatgen versions
try:
mem = int(Memory.from_str(s.replace("B", "b")).to("Mb"))
except AttributeError: # For even older pymatgen versions
mem = int(Memory.from_string(s.replace("B", "b")).to("Mb"))
return mem
else:
return int(s)
[docs]
def slurm_get_jobs() -> dict[int, dict]:
"""
Invoke squeue, parse output and return list of dictionaries with job info indexed by job id.
"""
# Based on https://gist.github.com/stevekm/7831fac98473ea17d781330baa0dd7aa
process = Popen(["squeue", "--me", "-o", '%all'],
stdout=PIPE, stderr=PIPE, shell=False, universal_newlines=True)
proc_stdout, proc_stderr = process.communicate()
lines = proc_stdout.split('\n')
header_line = lines.pop(0)
header_cols = header_line.split('|')
entries = []
error_lines = [] # do something with this later
for line in lines:
parts = line.split('|')
if len(parts) != len(header_cols):
error_lines.append((len(parts), line, parts))
else:
d = {}
for i, key in enumerate(header_cols):
d[key] = parts[i]
if key == "JOBID":
d[key] = int(d[key])
entries.append(d)
return {e["JOBID"]: e for e in entries}
[docs]
class SlurmJobArray:
"""
Example:
header = '''\
#!/bin/bash
#SBATCH --account=battab
#SBATCH --job-name=abiml_md
#SBATCH --time=0-16:0:0
#SBATCH --partition=batch
#SBATCH --nodes=1 # 1 node has 128 cores
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=1
conda activate env3.10
export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK
ulimit -s unlimited
'''
command = "abiml.py md"
arr_options = ["--help", "--version"]
job_array = SlurmJobArray(header, command, arr_options)
print(job_array)
queue_id = job_array.sbatch("job_array.sh")
"""
def __init__(self, header: str, command: str, arr_options: list[str]):
self.command = command
if not self.command.endswith(" "): self.command += " "
self.header = header
self.arr_options = arr_options
self.arr_options_str = rm_multiple_spaces("\n".join(arr_options))
def __str__(self):
# Add slurm array section.
lines = self.header.splitlines()
for il, line in enumerate(lines):
if line.startswith("#SBATCH"):
break
else:
raise ValueError("Cannot find line starting with #SBATCH")
lines.insert(il, f"#SBATCH --array=0-{len(self.arr_options)-1}")
header = "\n".join(lines)
select_opts = r"""
# multiline string
OPTS_STRING="
%s
"
# Convert the multiline string to an array
IFS=$'\n' read -rd '' -a OPTS_LIST <<< "$OPTS_STRING"
# Index of the entry you want (0-based)
index=${SLURM_ARRAY_TASK_ID}
# Check if the index is within the range of the array
OPTS="${OPTS_LIST[index]}"
echo "Running entry at index $index:\nwith OPTS=$OPTS"
env
""" % (self.arr_options_str)
end = f"""
{self.command} ${{OPTS}} > job_${{index}}.log 2> job_${{index}}.err
# Remove the file with the Slurm job id
me=$(basename "$0")
rm ${{me}}.qid
"""
return header + select_opts + end
[docs]
def sbatch(self, slurm_filepath: PathLike) -> int:
"""
Write slurm submission script to slurm_filepath and submits it.
Return Slurm JOB id.
"""
# Make sure no slurm job is already running by checking for a .qid file.
path_qid = slurm_filepath + ".qid"
if os.path.exists(path_qid):
with open(path_qid, "rt") as fh:
queue_id = int(fh.read().split("#"))
err_msg = f"Found slurm job ID {queue_id} in {path_qid}" + \
"This usually indicates that a similar array job is already running\n" + \
f"If this not the case, please remove {path_qid} and rerun the script."
raise RuntimeError(err_msg)
with open(slurm_filepath, "wt") as fh:
fh.write(str(self))
queue_id = slurm_sbatch(slurm_filepath)
return queue_id
[docs]
def slurm_write_and_sbatch(script_filepath: str, slurm_script_str: str) -> int:
"""
Write job script and submit it to the queue with Slurm sbatch. Return Slurm JOB ID.
"""
with open(script_filepath, "wt") as fh:
fh.write(slurm_script_str)
return slurm_sbatch(script_filepath)
[docs]
def slurm_sbatch(slurm_filepath: PathLike) -> int:
"""
Submit a job script to the queue with Slurm sbatch. Return Slurm JOB ID.
"""
from monty.os import cd
dirpath = os.path.dirname(str(slurm_filepath))
#print("dirpath", dirpath)
with cd(dirpath):
# need string not bytes so must use universal_newlines
slurm_filepath = str(slurm_filepath)
process = Popen(["sbatch", slurm_filepath], stdout=PIPE, stderr=PIPE, universal_newlines=True)
out, err = process.communicate()
# grab the returncode. SLURM returns 0 if the job was successful
if process.returncode == 0:
try:
# output should of the form '2561553.sdb' or '352353.jessup' - just grab the first part for job id
queue_id = int(out.split()[3])
path_qid = slurm_filepath + ".qid"
print(f"Job submission was successful and queue_id: {queue_id}")
print("Saving slurm job ID in:", path_qid)
with open(path_qid, "wt") as fh:
fh.write(str(queue_id) + " # Slurm job id")
return queue_id
except Exception as exc:
# probably error parsing job code
print('Could not parse job id following slurm...')
raise exc
else:
raise RuntimeError(f"Error while submitting {slurm_filepath=} with {process.returncode=},\n{out=}\n{err=}")
[docs]
def get_sacct_info():
"""
Run the sacct command to get the job information
"""
try:
result = run(['sacct', '--format=JobID,JobName,Partition,Account,AllocCPUS,State,ExitCode', '--noheader'],
stdout=PIPE, stderr=PIPE, text=True)
# Check if the command was successful
if result.returncode != 0:
print(f"Error running sacct: {result.stderr}")
return None
# Process the output
jobs_info = result.stdout.strip().split('\n')
jobs = [dict(zip(['JobID', 'JobName', 'Partition', 'Account', 'AllocCPUS', 'State', 'ExitCode'], job.split()))
for job in jobs_info]
return jobs
except Exception as e:
print(f"An error occurred: {e}")
return None
[docs]
def get_completed_job_info(job_id: int | str):
try:
# Define the fields we want to retrieve
fields = "JobID,JobName,Partition,Account,AllocCPUS,State,ExitCode,Start,End,Elapsed,TotalCPU,MaxRSS"
# Run the sacct command with the specified fields for the given job ID
result = run(
['sacct', '--jobs', str(job_id), '--format', fields, '--noheader', '--parsable2'],
stdout=PIPE, stderr=PIPE, text=True
)
# Check if the command was successful
if result.returncode != 0:
print(f"Error running sacct: {result.stderr}")
return None
# Process the output
lines = result.stdout.strip().split('\n')
jobs = [dict(zip(fields.split(','), line.split('|'))) for line in lines]
return jobs
except Exception as e:
print(f"An error occurred: {e}")
return None
[docs]
def get_slurm_template(body: str) -> str:
"""
Return template for slurm submission that is supposed to be customized by the user.
"""
return f"""\
#!/bin/bash
#
# Please CUSTOMIZE this section according to your cluster and the type of calculation
#
#SBATCH --job-name=my_job
#SBATCH --output=%j_%x.slurm.out
#SBATCH --error=%j_%x.slurm.err
#SBATCH --partition=debug
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=64
#SBATCH --cpus-per-task=1
#SBATCH --mem-per-cpu=2G
#SBATCH --time=2:00:00
#SBATCH --account=htforft
export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK
# ------------------------------------------------------------------------------
# Printing some information
# ------------------------------------------------------------------------------
echo "------------------- Job info -------------------"
echo "job_id : $SLURM_JOB_ID"
echo "jobname : $SLURM_JOB_NAME"
echo "queue : $SLURM_JOB_PARTITION"
echo "qos : $SLURM_JOB_QOS"
echo "account : $SLURM_JOB_ACCOUNT"
echo "submit dir : $SLURM_SUBMIT_DIR"
echo "number of mpi tasks: $SLURM_NTASKS tasks"
echo "OMP_NUM_THREADS : $OMP_NUM_THREADS"
echo "number of gpus : $SLURM_GPUS_ON_NODE"
echo "------------------- Node list ------------------"
echo $SLURM_JOB_NODELIST
echo "---------------- Printing limits ---------------"
ulimit -s unlimited
ulimit -a
# ------------------------------------------------------------------------------
# Setting up the environment
# ------------------------------------------------------------------------------
echo "----------------- Environment ------------------"
source $HOME/vasp.6.2.1/modules.sh
module list
echo "--------------- Running the code ---------------"
echo -n "This run started on: "
date
{body}
echo -n "This run completed on: "
date
"""
[docs]
def get_custodian_template() -> str:
return """\
#!/usr/bin/env python
from custodian.custodian import Custodian
from custodian.vasp.jobs import VaspJob
from custodian.vasp.handlers import VaspErrorHandler, UnconvergedErrorHandler
# List of error handlers
handlers = [
VaspErrorHandler(), # Handles common VASP errors
UnconvergedErrorHandler() # Handles unconverged calculations
]
# Define the VASP job with appropriate command and parameters
jobs = [
VaspJob(
#["mpirun", "vasp_std"], # Replace NCORES with the number of cores
#["mpirun", "-np", "1", "vasp_std"], # Replace NCORES with the number of cores
["mpirun", "vasp_std"], # Replace NCORES with the number of cores
auto_npar=False,
final=True
)
]
# Create the Custodian instance with handlers and jobs
c = Custodian(handlers, jobs, max_errors=5)
# Run the Custodian job
c.run()
"""