Source code for abipy.data

"""
Functions providing access to file data for unit tests and tutorials.
Preferred way to import the module is via the import syntax:

    import abipy.data as abidata
"""
import os

from abipy.core.structure import Structure
from abipy.flowtk import Pseudo, PseudoTable
from abipy.data.ucells import structure_from_ucell


__all__ = [
    "cif_file",
    "pseudo",
    "pseudos",
    "ref_file",
    "ref_files",
    "structure_from_ucell",
]

dirpath = os.path.dirname(__file__)

_CIF_DIRPATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "cifs"))

_PSEUDOS_DIRPATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "pseudos"))

_VARIABLES_DIRPATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "variables"))

_MPDATA_DIRPATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "mpdata"))

_SCRIPTS_DIRPATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "examples"))
_SCRIPTS = None


def pyscript(basename):
    """
    Return the absolute name of one of the scripts in the `examples` directory from its basename.
    """
    global _SCRIPTS
    if _SCRIPTS is None:
        # Build mapping basename --> path.
        from monty.os.path import find_exts
        pypaths = find_exts(_SCRIPTS_DIRPATH, ".py", exclude_dirs="_*|.*|develop", match_mode="basename")
        _SCRIPTS = {}
        for p in pypaths:
            k = os.path.basename(p)
            # Ignore e.g. __init__.py and private scripts.
            if k.startswith("_"): continue
            if k in _SCRIPTS:
                raise ValueError("Fond duplicated basenames with name %s\nActual:%s\nPrevious: %s" % (k, p, _SCRIPTS[k]))
            _SCRIPTS[k] = p

    return _SCRIPTS[basename]


[docs]def cif_file(filename): """Returns the absolute path of the CIF file in tests/data/cifs.""" return os.path.join(_CIF_DIRPATH, filename)
def cif_files(*filenames): """Returns the absolute path of the CIF files in tests/data/cifs.""" return list(map(cif_file, filenames)) def structure_from_cif(filename): """ Returnn an Abipy structure from the basename of the cif file in data/cifs. """ return Structure.from_file(cif_file(filename)) pseudo_dir = _PSEUDOS_DIRPATH
[docs]def pseudo(filename): """Returns a `Pseudo` object.""" filepath = os.path.join(_PSEUDOS_DIRPATH, filename) return Pseudo.from_file(filepath)
[docs]def pseudos(*filenames): """Returns a PseudoTable constructed from the input filenames located in tests/data/pseudos.""" pseudos = list(map(pseudo, filenames)) return PseudoTable(pseudos)
def var_file(filename): """Returns a yml file located in data/variables.""" return os.path.join(_VARIABLES_DIRPATH, filename) def find_ncfiles(top, verbose=0): """ Find all netcdf files starting from the top-level directory `top`. Filenames must be unique. Directories starting with "tmp_" are excluded from the search. Returns: dictionary with mapping: basename --> absolute path. """ ncfiles = {} for dirpath, dirnames, filenames in os.walk(top, topdown=True): #if "tmp_" in dirpath: continue dirnames[:] = [d for d in dirnames if not (d.startswith("tmp_") or d.startswith("_"))] for basename in filenames: apath = os.path.join(dirpath, basename) if basename.endswith(".nc"): if basename in ncfiles: err_msg = "Found duplicated basename %s\n" % basename err_msg += "Stored: %s, new %s\n" % (ncfiles[basename], apath) if not verbose: import warnings warnings.warn(err_msg) #raise ValueError(err_msg) else: ncfiles[basename] = apath return ncfiles _DATA_NCFILES = find_ncfiles(top=os.path.join(os.path.dirname(__file__), "refs"))
[docs]def ref_file(basename): """Returns the absolute path of basename in tests/data directory.""" if basename in _DATA_NCFILES: return _DATA_NCFILES[basename] else: path = os.path.join(dirpath, basename) if not os.path.exists(path): raise ValueError("Cannot find reference file `%s`, at abs_path: `%s`" % (basename, path)) return path
[docs]def ref_files(*basenames): """List with the absolute path of basenames in tests/data directory.""" return list(map(ref_file, basenames))
def ncfiles_with_ext(ext): """Return a list with the absolute path of the files with extension ext.""" ncfiles = [] for basename, path in _DATA_NCFILES.items(): f = basename.rstrip(".nc").rstrip("-etsf") if f.endswith("_" + ext): ncfiles.append(path) return ncfiles _MP_STRUCT_DICT = None def get_mp_structures_dict(): """ Returns a dictionary containing the structures stored in mpdata/mp_structures. """ global _MP_STRUCT_DICT if _MP_STRUCT_DICT is not None: return _MP_STRUCT_DICT import json from monty.json import MontyDecoder with open(os.path.join(_MPDATA_DIRPATH, 'mp_structures.json'), 'rt') as f: _MP_STRUCT_DICT = json.load(f, cls=MontyDecoder) # Change Structure class for k, v in _MP_STRUCT_DICT.items(): _MP_STRUCT_DICT[k].__class__ = Structure return _MP_STRUCT_DICT def structure_from_mpid(mpid): """ Return an Abipy Structure from the `mpid` identifier. See mpdata/mp_structure.json """ d = get_mp_structures_dict() if mpid not in d: raise KeyError("%s not in dictionary keys:\n%s" % (mpid, list(d.keys()))) return d[mpid] WFK_NCFILES = ncfiles_with_ext("WFK") DEN_NCFILES = ncfiles_with_ext("DEN") GSR_NCFILES = ncfiles_with_ext("GSR") SIGRES_NCFILES = ncfiles_with_ext("SIGRES") ALL_NCFILES = list(_DATA_NCFILES.values()) class FilesGenerator(object): """This class generates the output files used in the unit tests and in the examples.""" def __init__(self, **kwargs): """ Args: workdir: Working directory. finalize: True if self.finalize is called verbose: Verbosity level. """ if not hasattr(self, "files_to_save"): raise ValueError("files_to_save is not defined") self.workdir = os.path.abspath(kwargs.pop("workdir", ".")) self.finalize = kwargs.pop("finalize", True) self.verbose = kwargs.pop("verbose", 1) self.files_to_keep = set([os.path.basename(__file__), "run.abi", "run.abo"] + list(self.files_to_save.keys())) def __str__(self): lines = [] app = lines.append app("%s: workdir:%s" % (self.__class__.__name__, self.workdir)) return "\n".join(lines) def run(self): """Run Abinit and rename output files. Return 0 if success""" from monty.os.path import which if which(self.executable) is None: raise RuntimeError("Cannot find %s in $PATH" % self.executable) os.chdir(self.workdir) process = self._run() process.wait() if process.returncode != 0: print("returncode == %s" % process.returncode) print(process.stderr.readlines()) return process.returncode if self.finalize: self._finalize() return 0 def _run(self): from subprocess import Popen, PIPE with open(os.path.join(self.workdir, "run.files"), "wt") as fh: fh.write(self.make_filesfile_str()) cmd = self.executable + " < run.files > run.log" return Popen(cmd, shell=True, stderr=PIPE, stdout=PIPE, cwd=self.workdir) def _finalize(self): all_files = os.listdir(self.workdir) # Remove files garbage = [f for f in all_files if f not in self.files_to_keep] for f in garbage: if f.endswith(".py"): continue if self.verbose: print("Will remove file %s" % f) os.remove(f) # Rename files. for old, new in self.files_to_save.items(): if self.verbose: print("Will rename %s --> %s" % (old, new)) os.rename(old, new) class AbinitFilesGenerator(FilesGenerator): # Subclasses must define the following class attributes: # List of pseudos in (basenames in abipy/data/pseudos #pseudos = ["14si.pspnc"] # Mapping old_name --> new_name for the output files that must be preserved. #files_to_save = { # "out_DS1_DEN-etsf.nc": "si_DEN-etsf.nc", # "out_DS2_GSR.nc": "si_nscf_GSR.nc", #} executable = "abinit" def __init__(self, **kwargs): super().__init__(**kwargs) # Add Absolute paths for the pseudopotentials. #self.pseudos = [p.filepath for p in pseudos(*self.pseudos)] self.pseudos = [os.path.join(_PSEUDOS_DIRPATH, pname) for pname in self.pseudos] def make_filesfile_str(self): return "\n".join(["run.abi", "run.abo", "in", "out", "tmp"] + self.pseudos) class AnaddbFilesGenerator(FilesGenerator): # Subclasses must define the following class attributes: # 1) Mapping old_name --> new_name for the output files that must be preserved. #files_to_save = { # "trf2_5.out_PHBST.nc": "trf2_5.out_PHBST.nc", # "trf2_5.out_PHDOS.nc": "trf2_5.out_PHDOS.nc", #} # 2) Input DDB (mandatory) in_ddb = None # 3) output DDB (optional) out_ddb = "dummy_out_ddb" # 3) Input GKK (optional) in_gkk = "dummy_in_gkk" # 4) base name for elphon output files (optional) elph_basename = "dummy_elph_basename" # 5) file containing ddk filenames for elphon/transport in_ddk = "dummy_in_ddk" executable = "anaddb" def __init__(self, **kwargs): super().__init__(**kwargs) if self.in_ddb is None: raise ValueError("in_ddb must be specified") self.files_to_keep.update([ self.in_ddb, self.out_ddb, self.in_gkk, self.elph_basename, self.in_ddk, ]) def make_filesfile_str(self): # 1) formatted input file # 2) formatted output file e.g. t13.out # 3) input derivative database e.g. t13.ddb.in # 4) output molecular dynamics e.g. t13.md # 5) input elphon matrix elements (GKK file) : # 6) base name for elphon output files e.g. t13 # 7) file containing ddk filenames for elphon/transport return "\n".join([ "run.abi", "out", self.in_ddb, self.out_ddb, self.in_gkk, self.elph_basename, self.in_ddk, ])