Source code for abipy.ppcodes.ppgen

# coding: utf-8
"""Interface for pseudopotential generators."""
from __future__ import annotations

import abc
import os
import tempfile
import collections
import shutil
import time

from typing import Optional
from shutil import which
from monty.termcolor import cprint
from abipy.flowtk.pseudos import Pseudo
from abipy.ppcodes.oncv_parser import OncvParser

import logging
logger = logging.getLogger(__name__)


# Possible status of the PseudoGenerator.

_STATUS2STR = collections.OrderedDict([
    (1, "Initialized"),    # PseudoGenerator has been initialized
    (2, "Running"),        # PseudoGenerator is running.
    (3, "Done"),           # Calculation done, This does not imply that results are OK
    (4, "Error"),          # PP generator error.
    (5, "Completed"),      # Execution completed successfully.
])


[docs] class Status(int): """ An integer representing the status of the 'PseudoGenerator`. """ def __repr__(self) -> str: return "<%s: %s, at %s>" % (self.__class__.__name__, str(self), id(self)) def __str__(self) -> str: """String representation.""" return _STATUS2STR[self]
[docs] @classmethod def as_status(cls, obj: Status | str) -> Status: """Convert obj into Status.""" if isinstance(obj, cls): return obj else: # Assume string return cls.from_string(obj)
[docs] @classmethod def from_string(cls, s: str) -> Status: """Return an instance from its string representation.""" for num, text in _STATUS2STR.items(): if text == s: return cls(num) else: raise ValueError(f"Wrong string: `{s}`")
class _PseudoGenerator(metaclass=abc.ABCMeta): """ This object receives a string with the input file and generates a pseudopotential. It calls the pp generator in a subprocess to produce the results in a temporary directory. It also provides an interface to validate/analyze/plot the results produced by the pseudopotential code. Concrete classes must: 1) call super().__init__() in their constructor. 2) the object should have the input file stored in self.input_str Attributes: workdir: Working directory (output results are produced in workdir) status: Flag defining the status of the ps generator. retcode: Return code of the code parser: Output parser. None if results are not available because the calculations is still running or errors pseudo: :class:`Pseudo` object. None if not available """ # Possible status S_INIT = Status.from_string("Initialized") S_RUN = Status.from_string("Running") S_DONE = Status.from_string("Done") S_ERROR = Status.from_string("Error") S_OK = Status.from_string("Completed") ALL_STATUS = [ S_INIT, S_RUN, S_DONE, S_ERROR, S_OK, ] # Basenames for stdin/stdout/stderr. stdin_basename: str = "run.in" stdout_basename: str = "run.out" stderr_basename: str = "run.err" def __init__(self, workdir: Optional[str] = None) -> None: # Set the initial status. self.set_status(self.S_INIT) self._parser = None if workdir is not None: workdir = os.path.abspath(workdir) if os.path.exists(workdir): raise RuntimeError(f"workdir `{workdir}` already exists") self.workdir = workdir else: # Build a temporary directory self.workdir = tempfile.mkdtemp(prefix=self.__class__.__name__) def __repr__(self) -> str: return "<%s at %s>" % (self.__class__.__name__, self.workdir) def __str__(self) -> str: return "<%s at %s, status=%s>" % (self.__class__.__name__, self.workdir, self.status) @property def stdin_path(self) -> str: """Absolute path of the standard input.""" return os.path.join(self.workdir, self.stdin_basename) @property def stdout_path(self) -> str: """Absolute path of the standard output.""" return os.path.join(self.workdir, self.stdout_basename) @property def stderr_path(self) -> str: """Absolute path of the standard error.""" return os.path.join(self.workdir, self.stderr_basename) @property def status(self) -> Status: """The status of the job.""" return self._status @property def retcode(self) -> int | None: """ Return code of the subprocess. None if not available because e.g. the job has not been started yet. """ try: return self._retcode except AttributeError: return None @property def parser(self): return self._parser #@property #def pseudo(self) -> Pseudo | None: # """Pseudo object or None if not available""" # try: # return self._pseudo # except AttributeError: # return None @property def executable(self) -> str: """Name of the executable.""" return self._executable @property def input_str(self) -> str: """String with the input file.""" return self._input_str def start(self) -> int: """" Run the calculation in a subprocess (non-blocking interface) Return 1 if calculation started, 0 otherwise. """ if self.status >= self.S_RUN: return 0 with open(self.stdin_path, "w") as fh: fh.write(self.input_str) # Start the calculation in a subprocess and return. args = [self.executable, "<", self.stdin_path, ">", self.stdout_path, "2>", self.stderr_path] self.cmd_str = " ".join(args) from subprocess import Popen, PIPE self.process = Popen(self.cmd_str, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.workdir) self.set_status(self.S_RUN, info_msg="Start on %s" % time.asctime) return 1 def start_and_wait(self) -> int: """ Run the calculation in a subprocess, wait for it and return exit status. """ self.start() retcode = self.wait() return retcode def poll(self) -> int: """ Check if child process has terminated. Set and return returncode attribute. """ self._retcode = self.process.poll() if self._retcode is not None: self.set_status(self.S_DONE) return self._retcode def wait(self) -> int: """ Wait for child process to terminate. Set and return returncode attribute. """ self._retcode = self.process.wait() self.set_status(self.S_DONE) return self._retcode def kill(self) -> None: """Kill the child.""" self.process.kill() self.set_status(self.S_ERROR) self.errors.append("Process has been killed by host code.") self._retcode = self.process.returncode def set_status(self, status, info_msg=None): """ Set the status. Args: status: Status object or string representation of the status info_msg: string with human-readable message used in the case of errors (optional) """ assert status in _STATUS2STR self._status = status if status == self.S_DONE: self.check_status() return status def get_stdin(self) -> str: return self.input_str def get_stdout(self) -> str: """ Returns a string with the stdout of the calculation. """ with open(self.stdout_path, "rt") as out: return out.read() def get_stderr(self) -> str: """ Return string with the stderr of the calculation. """ with open(self.stderr_path, "rt") as err: return err.read() def rmtree(self) -> int: """ Remove the temporary directory. Return exit status """ try: shutil.rmtree(self.workdir) return 0 except Exception: return 1 ### ABC PROTOCOL ### #@abc.abstractproperty #def parser(self): # return _ @abc.abstractmethod def check_status(self): """ This function checks the status of the task by inspecting the output and the error files produced by the application """ #import enum #class CalcType(enum.Enum): # nor = "non-relativistic" # sr = "scalar-relativistic" # fr = "fully-relativistic"
[docs] class OncvGenerator(_PseudoGenerator): """ This object receives an input file for oncvpsp, a string that defines the type of calculation (scalar-relativistic, ...) runs the code in a temporary directory and provides methods to validate/analyze/plot the final results. Attributes: retcode: Retcode of oncvpsp """
[docs] @classmethod def from_file(cls, path: str, calc_type: str, use_mgga: bool, workdir: Optional[str] = None) -> OncvGenerator: """ Build the object from a file containing the input parameters. """ with open(path, "rt") as fh: input_str = fh.read() return cls(input_str, calc_type, use_mgga=use_mgga, workdir=workdir)
def __init__(self, input_str: str, calc_type: str, use_mgga: bool, workdir: Optional[str] = None): super().__init__(workdir=workdir) self._input_str = input_str self.calc_type = calc_type self.use_mgga = use_mgga if self.use_mgga: if calc_type != "scalar-relativistic": raise ValueError("Only scalar-relativistic pseudos are supported in metagga mode!") self._executable = { "scalar-relativistic": which("oncvpspm.x"), }[calc_type] else: self._executable = { "non-relativistic": which("oncvpspnr.x"), "scalar-relativistic": which("oncvpsp.x"), "fully-relativistic": which("oncvpspr.x"), }[calc_type] if self._executable is None: msg = "Cannot find oncvpsp executable in $PATH. Use `export PATH=dir_with_oncvps_executable:$PATH`" raise RuntimeError(msg)
[docs] def check_status(self): """ Check the status of the run, set and return self.status attribute. """ if self._status == self.S_OK: return self._status parser = self._parser = OncvParser(self.stdout_path) try: parser.scan() except parser.Error as exc: cprint(str(exc), color="red") self._status = self.S_ERROR return self._status logger.info("run_completed:", parser.run_completed) if self.status == self.S_DONE and not parser.run_completed: logger.info("Run is not completed!") self._status = self.S_ERROR if parser.run_completed: logger.info("setting status to S_OK") self._status = self.S_OK psp8_filepath = os.path.join(self.workdir, parser.atsym + ".psp8") # Write psp8 file. psp8_str = parser.get_psp8_str() if psp8_str is not None: with open(psp8_filepath, "wt") as fh: fh.write(psp8_str) # Add UPF string if present. upf_str = parser.get_upf_str() if upf_str is not None: with open(psp8_filepath.replace(".psp8", ".upf"), "wt") as fh: fh.write(upf_str) # Initialize self.pseudo from file. pseudo = Pseudo.from_file(psp8_filepath) # Add md5 checksum to dojo_report if pseudo.has_dojo_report: pseudo.dojo_report["md5"] = pseudo.compute_md5() pseudo.write_dojo_report(report=pseudo.dojo_report) if parser.errors: logger.warning("setting status to S_ERROR") self._status = self.S_ERROR return self._status