# coding: utf-8
"""Interface for pseudopotential generators."""
from __future__ import annotations
import abc
import os
import tempfile
import collections
import shutil
import time
from typing import Optional
from shutil import which
from monty.termcolor import cprint
from abipy.flowtk.pseudos import Pseudo
from abipy.ppcodes.oncv_parser import OncvParser
import logging
logger = logging.getLogger(__name__)
# Possible status of the PseudoGenerator.
_STATUS2STR = collections.OrderedDict([
(1, "Initialized"), # PseudoGenerator has been initialized
(2, "Running"), # PseudoGenerator is running.
(3, "Done"), # Calculation done, This does not imply that results are OK
(4, "Error"), # PP generator error.
(5, "Completed"), # Execution completed successfully.
])
[docs]
class Status(int):
"""
An integer representing the status of the 'PseudoGenerator`.
"""
def __repr__(self) -> str:
return "<%s: %s, at %s>" % (self.__class__.__name__, str(self), id(self))
def __str__(self) -> str:
"""String representation."""
return _STATUS2STR[self]
[docs]
@classmethod
def as_status(cls, obj: Status | str) -> Status:
"""Convert obj into Status."""
if isinstance(obj, cls):
return obj
else:
# Assume string
return cls.from_string(obj)
[docs]
@classmethod
def from_string(cls, s: str) -> Status:
"""Return an instance from its string representation."""
for num, text in _STATUS2STR.items():
if text == s:
return cls(num)
else:
raise ValueError(f"Wrong string: `{s}`")
class _PseudoGenerator(metaclass=abc.ABCMeta):
"""
This object receives a string with the input file and generates a pseudopotential.
It calls the pp generator in a subprocess to produce the results in a temporary directory.
It also provides an interface to validate/analyze/plot the results
produced by the pseudopotential code.
Concrete classes must:
1) call super().__init__() in their constructor.
2) the object should have the input file stored in self.input_str
Attributes:
workdir: Working directory (output results are produced in workdir)
status: Flag defining the status of the ps generator.
retcode: Return code of the code
parser: Output parser. None if results are not available because
the calculations is still running or errors
pseudo:
:class:`Pseudo` object. None if not available
"""
# Possible status
S_INIT = Status.from_string("Initialized")
S_RUN = Status.from_string("Running")
S_DONE = Status.from_string("Done")
S_ERROR = Status.from_string("Error")
S_OK = Status.from_string("Completed")
ALL_STATUS = [
S_INIT,
S_RUN,
S_DONE,
S_ERROR,
S_OK,
]
# Basenames for stdin/stdout/stderr.
stdin_basename: str = "run.in"
stdout_basename: str = "run.out"
stderr_basename: str = "run.err"
def __init__(self, workdir: Optional[str] = None) -> None:
# Set the initial status.
self.set_status(self.S_INIT)
self._parser = None
if workdir is not None:
workdir = os.path.abspath(workdir)
if os.path.exists(workdir):
raise RuntimeError(f"workdir `{workdir}` already exists")
self.workdir = workdir
else:
# Build a temporary directory
self.workdir = tempfile.mkdtemp(prefix=self.__class__.__name__)
def __repr__(self) -> str:
return "<%s at %s>" % (self.__class__.__name__, self.workdir)
def __str__(self) -> str:
return "<%s at %s, status=%s>" % (self.__class__.__name__, self.workdir, self.status)
@property
def stdin_path(self) -> str:
"""Absolute path of the standard input."""
return os.path.join(self.workdir, self.stdin_basename)
@property
def stdout_path(self) -> str:
"""Absolute path of the standard output."""
return os.path.join(self.workdir, self.stdout_basename)
@property
def stderr_path(self) -> str:
"""Absolute path of the standard error."""
return os.path.join(self.workdir, self.stderr_basename)
@property
def status(self) -> Status:
"""The status of the job."""
return self._status
@property
def retcode(self) -> int | None:
"""
Return code of the subprocess.
None if not available because e.g. the job has not been started yet.
"""
try:
return self._retcode
except AttributeError:
return None
@property
def parser(self):
return self._parser
#@property
#def pseudo(self) -> Pseudo | None:
# """Pseudo object or None if not available"""
# try:
# return self._pseudo
# except AttributeError:
# return None
@property
def executable(self) -> str:
"""Name of the executable."""
return self._executable
@property
def input_str(self) -> str:
"""String with the input file."""
return self._input_str
def start(self) -> int:
""""
Run the calculation in a subprocess (non-blocking interface)
Return 1 if calculation started, 0 otherwise.
"""
if self.status >= self.S_RUN:
return 0
with open(self.stdin_path, "w") as fh:
fh.write(self.input_str)
# Start the calculation in a subprocess and return.
args = [self.executable, "<", self.stdin_path, ">", self.stdout_path, "2>", self.stderr_path]
self.cmd_str = " ".join(args)
from subprocess import Popen, PIPE
self.process = Popen(self.cmd_str, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.workdir)
self.set_status(self.S_RUN, info_msg="Start on %s" % time.asctime)
return 1
def start_and_wait(self) -> int:
"""
Run the calculation in a subprocess, wait for it and return exit status.
"""
self.start()
retcode = self.wait()
return retcode
def poll(self) -> int:
"""
Check if child process has terminated. Set and return returncode attribute.
"""
self._retcode = self.process.poll()
if self._retcode is not None:
self.set_status(self.S_DONE)
return self._retcode
def wait(self) -> int:
"""
Wait for child process to terminate. Set and return returncode attribute.
"""
self._retcode = self.process.wait()
self.set_status(self.S_DONE)
return self._retcode
def kill(self) -> None:
"""Kill the child."""
self.process.kill()
self.set_status(self.S_ERROR)
self.errors.append("Process has been killed by host code.")
self._retcode = self.process.returncode
def set_status(self, status, info_msg=None):
"""
Set the status.
Args:
status: Status object or string representation of the status
info_msg: string with human-readable message used in the case of errors (optional)
"""
assert status in _STATUS2STR
self._status = status
if status == self.S_DONE:
self.check_status()
return status
def get_stdin(self) -> str:
return self.input_str
def get_stdout(self) -> str:
"""
Returns a string with the stdout of the calculation.
"""
with open(self.stdout_path, "rt") as out:
return out.read()
def get_stderr(self) -> str:
"""
Return string with the stderr of the calculation.
"""
with open(self.stderr_path, "rt") as err:
return err.read()
def rmtree(self) -> int:
"""
Remove the temporary directory. Return exit status
"""
try:
shutil.rmtree(self.workdir)
return 0
except Exception:
return 1
### ABC PROTOCOL ###
#@abc.abstractproperty
#def parser(self):
# return _
@abc.abstractmethod
def check_status(self):
"""
This function checks the status of the task by inspecting the output and the
error files produced by the application
"""
#import enum
#class CalcType(enum.Enum):
# nor = "non-relativistic"
# sr = "scalar-relativistic"
# fr = "fully-relativistic"
[docs]
class OncvGenerator(_PseudoGenerator):
"""
This object receives an input file for oncvpsp, a string
that defines the type of calculation (scalar-relativistic, ...)
runs the code in a temporary directory and provides methods
to validate/analyze/plot the final results.
Attributes:
retcode: Retcode of oncvpsp
"""
[docs]
@classmethod
def from_file(cls, path: str, calc_type: str, use_mgga: bool, workdir: Optional[str] = None) -> OncvGenerator:
"""
Build the object from a file containing the input parameters.
"""
with open(path, "rt") as fh:
input_str = fh.read()
return cls(input_str, calc_type, use_mgga=use_mgga, workdir=workdir)
def __init__(self, input_str: str, calc_type: str, use_mgga: bool, workdir: Optional[str] = None):
super().__init__(workdir=workdir)
self._input_str = input_str
self.calc_type = calc_type
self.use_mgga = use_mgga
if self.use_mgga:
if calc_type != "scalar-relativistic":
raise ValueError("Only scalar-relativistic pseudos are supported in metagga mode!")
self._executable = {
"scalar-relativistic": which("oncvpspm.x"),
}[calc_type]
else:
self._executable = {
"non-relativistic": which("oncvpspnr.x"),
"scalar-relativistic": which("oncvpsp.x"),
"fully-relativistic": which("oncvpspr.x"),
}[calc_type]
if self._executable is None:
msg = "Cannot find oncvpsp executable in $PATH. Use `export PATH=dir_with_oncvps_executable:$PATH`"
raise RuntimeError(msg)
[docs]
def check_status(self):
"""
Check the status of the run, set and return self.status attribute.
"""
if self._status == self.S_OK:
return self._status
parser = self._parser = OncvParser(self.stdout_path)
try:
parser.scan()
except parser.Error as exc:
cprint(str(exc), color="red")
self._status = self.S_ERROR
return self._status
logger.info("run_completed:", parser.run_completed)
if self.status == self.S_DONE and not parser.run_completed:
logger.info("Run is not completed!")
self._status = self.S_ERROR
if parser.run_completed:
logger.info("setting status to S_OK")
self._status = self.S_OK
psp8_filepath = os.path.join(self.workdir, parser.atsym + ".psp8")
# Write psp8 file.
psp8_str = parser.get_psp8_str()
if psp8_str is not None:
with open(psp8_filepath, "wt") as fh:
fh.write(psp8_str)
# Add UPF string if present.
upf_str = parser.get_upf_str()
if upf_str is not None:
with open(psp8_filepath.replace(".psp8", ".upf"), "wt") as fh:
fh.write(upf_str)
# Initialize self.pseudo from file.
pseudo = Pseudo.from_file(psp8_filepath)
# Add md5 checksum to dojo_report
if pseudo.has_dojo_report:
pseudo.dojo_report["md5"] = pseudo.compute_md5()
pseudo.write_dojo_report(report=pseudo.dojo_report)
if parser.errors:
logger.warning("setting status to S_ERROR")
self._status = self.S_ERROR
return self._status