# coding: utf-8
"""Interface for pseudopotential generators."""
from __future__ import annotations
import abc
import os
import tempfile
import collections
import shutil
import time
from typing import Union, Optional # Any,
from shutil import which
from monty.termcolor import cprint
from abipy.flowtk.pseudos import Pseudo
from abipy.ppcodes.oncv_parser import OncvParser
import logging
logger = logging.getLogger(__name__)
# Possible status of the PseudoGenerator.
_STATUS2STR = collections.OrderedDict([
(1, "Initialized"), # PseudoGenerator has been initialized
(2, "Running"), # PseudoGenerator is running.
(3, "Done"), # Calculation done, This does not imply that results are OK
(4, "Error"), # PP generator error.
(5, "Completed"), # Execution completed successfully.
])
[docs]
class Status(int):
"""
An integer representing the status of the 'PseudoGenerator`.
"""
def __repr__(self) -> str:
return "<%s: %s, at %s>" % (self.__class__.__name__, str(self), id(self))
def __str__(self) -> str:
"""String representation."""
return _STATUS2STR[self]
[docs]
@classmethod
def as_status(cls, obj: Union[Status, str]) -> Status:
"""Convert obj into Status."""
if isinstance(obj, cls):
return obj
else:
# Assume string
return cls.from_string(obj)
[docs]
@classmethod
def from_string(cls, s: str) -> Status:
"""Return an instance from its string representation."""
for num, text in _STATUS2STR.items():
if text == s:
return cls(num)
else:
raise ValueError(f"Wrong string: `{s}`")
class _PseudoGenerator(metaclass=abc.ABCMeta):
"""
This object receives a string with the input file and generates a pseudopotential.
It calls the pp generator in a subprocess to produce the results in a temporary directory.
It also provides an interface to validate/analyze/plot the results
produced by the pseudopotential code.
Concrete classes must:
1) call super().__init__() in their constructor.
2) the object should have the input file stored in self.input_str
Attributes:
workdir: Working directory (output results are produced in workdir)
status: Flag defining the status of the ps generator.
retcode: Return code of the code
parser: Output parser. None if results are not available because
the calculations is still running or errors
pseudo:
:class:`Pseudo` object. None if not available
"""
# Possible status
S_INIT = Status.from_string("Initialized")
S_RUN = Status.from_string("Running")
S_DONE = Status.from_string("Done")
S_ERROR = Status.from_string("Error")
S_OK = Status.from_string("Completed")
ALL_STATUS = [
S_INIT,
S_RUN,
S_DONE,
S_ERROR,
S_OK,
]
# Basenames for stdin/stdout/stderr.
stdin_basename: str = "run.in"
stdout_basename: str = "run.out"
stderr_basename: str = "run.err"
def __init__(self, workdir: Optional[str] = None) -> None:
# Set the initial status.
self.set_status(self.S_INIT)
self._parser = None
if workdir is not None:
workdir = os.path.abspath(workdir)
if os.path.exists(workdir):
raise RuntimeError(f"workdir `{workdir}` already exists")
self.workdir = workdir
else:
# Build a temporary directory
self.workdir = tempfile.mkdtemp(prefix=self.__class__.__name__)
def __repr__(self) -> str:
return "<%s at %s>" % (self.__class__.__name__, self.workdir)
def __str__(self) -> str:
return "<%s at %s, status=%s>" % (self.__class__.__name__, self.workdir, self.status)
@property
def stdin_path(self) -> str:
"""Absolute path of the standard input."""
return os.path.join(self.workdir, self.stdin_basename)
@property
def stdout_path(self) -> str:
"""Absolute path of the standard output."""
return os.path.join(self.workdir, self.stdout_basename)
@property
def stderr_path(self) -> str:
"""Absolute path of the standard error."""
return os.path.join(self.workdir, self.stderr_basename)
@property
def status(self) -> Status:
"""The status of the job."""
return self._status
@property
def retcode(self) -> Union[int, None]:
"""
Return code of the subprocess.
None if not available because e.g. the job has not been started yet.
"""
try:
return self._retcode
except AttributeError:
return None
@property
def parser(self):
return self._parser
#@property
#def pseudo(self) -> Union[Pseudo, None]:
# """Pseudo object or None if not available"""
# try:
# return self._pseudo
# except AttributeError:
# return None
@property
def executable(self) -> str:
"""Name of the executable."""
return self._executable
@property
def input_str(self) -> str:
"""String with the input file."""
return self._input_str
def start(self) -> int:
""""
Run the calculation in a subprocess (non-blocking interface)
Return 1 if calculation started, 0 otherwise.
"""
if self.status >= self.S_RUN:
return 0
with open(self.stdin_path, "w") as fh:
fh.write(self.input_str)
# Start the calculation in a subprocess and return.
args = [self.executable, "<", self.stdin_path, ">", self.stdout_path, "2>", self.stderr_path]
self.cmd_str = " ".join(args)
from subprocess import Popen, PIPE
self.process = Popen(self.cmd_str, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.workdir)
self.set_status(self.S_RUN, info_msg="Start on %s" % time.asctime)
return 1
def start_and_wait(self) -> int:
"""
Run the calculation in a subprocess, wait for it and return exit status.
"""
self.start()
retcode = self.wait()
return retcode
def poll(self) -> int:
"""
Check if child process has terminated. Set and return returncode attribute.
"""
self._retcode = self.process.poll()
if self._retcode is not None:
self.set_status(self.S_DONE)
return self._retcode
def wait(self) -> int:
"""
Wait for child process to terminate. Set and return returncode attribute.
"""
self._retcode = self.process.wait()
self.set_status(self.S_DONE)
return self._retcode
def kill(self) -> None:
"""Kill the child."""
self.process.kill()
self.set_status(self.S_ERROR)
self.errors.append("Process has beed killed by host code.")
self._retcode = self.process.returncode
def set_status(self, status, info_msg=None):
"""
Set the status.
Args:
status: Status object or string representation of the status
info_msg: string with human-readable message used in the case of errors (optional)
"""
assert status in _STATUS2STR
self._status = status
if status == self.S_DONE:
self.check_status()
return status
def get_stdin(self) -> str:
return self.input_str
def get_stdout(self) -> str:
"""
Returns a string with the stdout of the calculation.
"""
with open(self.stdout_path, "rt") as out:
return out.read()
def get_stderr(self) -> str:
"""
Return string with the stderr of the calculation.
"""
with open(self.stderr_path, "rt") as err:
return err.read()
def rmtree(self) -> int:
"""
Remove the temporary directory. Return exit status
"""
try:
shutil.rmtree(self.workdir)
return 0
except Exception:
return 1
### ABC PROTOCOL ###
#@abc.abstractproperty
#def parser(self):
# return _
@abc.abstractmethod
def check_status(self):
"""
This function checks the status of the task by inspecting the output and the
error files produced by the application
"""
#import enum
#class CalcType(enum.Enum):
# nor = "non-relativistic"
# sr = "scalar-relativistic"
# fr = "fully-relativistic"
[docs]
class OncvGenerator(_PseudoGenerator):
"""
This object receives an input file for oncvpsp, a string
that defines the type of calculation (scalar-relativistic, ...)
runs the code in a temporary directory and provides methods
to validate/analyze/plot the final results.
Attributes:
retcode: Retcode of oncvpsp
"""
[docs]
@classmethod
def from_file(cls, path: str, calc_type: str, use_mgga: bool, workdir: Optional[str] = None) -> OncvGenerator:
"""
Build the object from a file containing the input parameters.
"""
with open(path, "rt") as fh:
input_str = fh.read()
return cls(input_str, calc_type, use_mgga=use_mgga, workdir=workdir)
def __init__(self, input_str: str, calc_type: str, use_mgga: bool, workdir: Optional[str] = None):
super().__init__(workdir=workdir)
self._input_str = input_str
self.calc_type = calc_type
self.use_mgga = use_mgga
if self.use_mgga:
if calc_type != "scalar-relativistic":
raise ValueError("Only scalar-relativistic pseudos are supported in metagga mode!")
self._executable = {
"scalar-relativistic": which("oncvpspm.x"),
}[calc_type]
else:
self._executable = {
"non-relativistic": which("oncvpspnr.x"),
"scalar-relativistic": which("oncvpsp.x"),
"fully-relativistic": which("oncvpspr.x"),
}[calc_type]
if self._executable is None:
msg = "Cannot find oncvpsp executable in $PATH. Use `export PATH=dir_with_oncvps_executable:$PATH`"
raise RuntimeError(msg)
[docs]
def check_status(self):
"""
Check the status of the run, set and return self.status attribute.
"""
if self._status == self.S_OK:
return self._status
parser = self._parser = OncvParser(self.stdout_path)
try:
parser.scan()
except parser.Error as exc:
cprint(str(exc), color="red")
self._status = self.S_ERROR
return self._status
logger.info("run_completed:", parser.run_completed)
if self.status == self.S_DONE and not parser.run_completed:
logger.info("Run is not completed!")
self._status = self.S_ERROR
if parser.run_completed:
logger.info("setting status to S_OK")
self._status = self.S_OK
psp8_filepath = os.path.join(self.workdir, parser.atsym + ".psp8")
# Write psp8 file.
psp8_str = parser.get_psp8_str()
if psp8_str is not None:
with open(psp8_filepath, "wt") as fh:
fh.write(psp8_str)
# Add UPF string if present.
upf_str = parser.get_upf_str()
if upf_str is not None:
with open(psp8_filepath.replace(".psp8", ".upf"), "wt") as fh:
fh.write(upf_str)
# Initialize self.pseudo from file.
pseudo = Pseudo.from_file(psp8_filepath)
# Add md5 checksum to dojo_report
if pseudo.has_dojo_report:
pseudo.dojo_report["md5"] = pseudo.compute_md5()
pseudo.write_dojo_report(report=pseudo.dojo_report)
if parser.errors:
logger.warning("setting status to S_ERROR")
self._status = self.S_ERROR
return self._status