Source code for abipy.abio.outputs

"""
Objects used to extract and plot results from output files in text format.
"""
from __future__ import annotations

import os
import numpy as np
import pandas as pd

from collections import OrderedDict
from io import StringIO
from typing import Union
from monty.string import is_string, marquee
from monty.functools import lazy_property
from monty.termcolor import cprint
from pymatgen.core.units import bohr_to_ang
from abipy.core.symmetries import AbinitSpaceGroup
from abipy.core.structure import Structure, dataframes_from_structures
from abipy.core.kpoints import has_timrev_from_kptopt
from abipy.core.mixins import TextFile, AbinitNcFile, NotebookWriter
from abipy.tools.typing import Figure
from abipy.abio.inputs import GEOVARS
from abipy.abio.timer import AbinitTimerParser
from abipy.abio.robots import Robot
from abipy.flowtk import EventsParser, NetcdfReader, GroundStateScfCycle, D2DEScfCycle


[docs] class AbinitTextFile(TextFile): """ Base class for the ABINIT main output files and log files. """ @property def events(self) -> list: """ List of ABINIT events reported in the file. """ # Parse the file the first time the property is accessed or when mtime is changed. stat = os.stat(self.filepath) if stat.st_mtime != self._last_mtime or not hasattr(self, "_events"): self._events = EventsParser().parse(self.filepath) return self._events
[docs] def get_timer(self) -> AbinitTimerParser: """ Timer data. """ timer = AbinitTimerParser() timer.parse(self.filepath) return timer
[docs] class AbinitLogFile(AbinitTextFile, NotebookWriter): """ Class representing the ABINIT log file. .. rubric:: Inheritance Diagram .. inheritance-diagram:: AbinitLogFile """
[docs] def to_string(self, verbose=0) -> str: """String representation with verbosity level verbose.""" return str(self.events)
[docs] def plot(self, **kwargs): """Empty placeholder.""" return None
#@add_fig_kwargs #def plot_mem(self, **kwargs) -> Figure: # return fig #@add_fig_kwargs #def plot_time(self, **kwargs) -> Figure: # return fig
[docs] def yield_figs(self, **kwargs): # pragma: no cover """ This function *generates* a predefined list of matplotlib figures with minimal input from the user. """ yield None
[docs] def write_notebook(self, nbpath=None) -> str: """ Write a jupyter_ notebook to ``nbpath``. If nbpath is None, a temporay file in the current working directory is created. Return path to the notebook. """ nbformat, nbv, nb = self.get_nbformat_nbv_nb(title=None) nb.cells.extend([ nbv.new_code_cell("abilog = abilab.abiopen('%s')" % self.filepath), nbv.new_code_cell("print(abilog.events)"), ]) return self._write_nb_nbpath(nb, nbpath)
[docs] class AbinitOutputFile(AbinitTextFile, NotebookWriter): """ Class representing the main Abinit output file. .. rubric:: Inheritance Diagram .. inheritance-diagram:: AbinitOutputFile """ # TODO: Extract number of errors and warnings. def __init__(self, filepath: str): super().__init__(filepath) self.debug_level = 0 self._parse() def _parse(self) -> None: """ header: String with the input variables footer: String with the output variables datasets: Dictionary mapping dataset index to list of strings. """ # Get code version and find magic line signaling that the output file is completed. self.version, self.run_completed = None, False self.overall_cputime, self.overall_walltime = 0.0, 0.0 self.proc0_cputime, self.proc0_walltime = 0.0, 0.0 with open(self.filepath, "rt") as fh: for line in fh: if self.version is None and line.startswith(".Version"): self.version = line.split()[1] if line.startswith("- Proc."): #- Proc. 0 individual time (sec): cpu= 25.5 wall= 26.1 tokens = line.split() self.proc0_walltime = float(tokens[-1]) self.proc0_cputime = float(tokens[-3]) if line.startswith("+Overall time"): #+Overall time at end (sec) : cpu= 25.5 wall= 26.1 tokens = line.split() self.overall_cputime = float(tokens[-3]) self.overall_walltime = float(tokens[-1]) if " Calculation completed." in line: self.run_completed = True # Parse header to get important dimensions and variables self.header, self.footer, self.datasets = [], [], OrderedDict() where = "in_header" with open(self.filepath, "rt") as fh: for line in fh: if "== DATASET" in line: # Save dataset number # == DATASET 1 ================================================================== where = int(line.replace("=", "").split()[-1]) assert where not in self.datasets self.datasets[where] = [] elif "== END DATASET(S) " in line: where = "in_footer" if where == "in_header": self.header.append(line) elif where == "in_footer": self.footer.append(line) else: # dataset number --> lines self.datasets[where].append(line) self.header = "".join(self.header) if self.debug_level: print("header:\n", self.header) # Output files produced in dryrun_mode contain the following line: # abinit : before driver, prtvol=0, debugging mode => will skip driver self.dryrun_mode = "debugging mode => will skip driver" in self.header #print("dryrun_mode:", self.dryrun_mode) #if " jdtset " in self.header: raise NotImplementedError("jdtset is not supported") #if " udtset " in self.header: raise NotImplementedError("udtset is not supported") self.ndtset = len(self.datasets) if not self.datasets: #raise NotImplementedError("Empty dataset sections.") self.ndtset = 1 self.datasets[1] = "Empty dataset" for key, data in self.datasets.items(): if self.debug_level: print("data") self.datasets[key] = "".join(data) if self.debug_level: print(self.datasets[key]) self.footer = "".join(self.footer) if self.debug_level: print("footer:\n", self.footer) self.initial_vars_global, self.initial_vars_dataset = self._parse_variables("header") self.final_vars_global, self.final_vars_dataset = None, None if self.run_completed: if self.dryrun_mode: # footer is not present. Copy values from header. self.final_vars_global, self.final_vars_dataset = self.initial_vars_global, self.initial_vars_dataset else: self.final_vars_global, self.final_vars_dataset = self._parse_variables("footer") def _parse_variables(self, what: str): vars_global = OrderedDict() vars_dataset = OrderedDict([(k, OrderedDict()) for k in self.datasets.keys()]) #print("keys", vars_dataset.keys()) lines = getattr(self, what).splitlines() if what == "header": magic_start = " -outvars: echo values of preprocessed input variables --------" elif what == "footer": magic_start = " -outvars: echo values of variables after computation --------" else: raise ValueError("Invalid value for what: `%s`" % str(what)) magic_stop = "================================================================================" # Select relevant portion with variables. for i, line in enumerate(lines): if magic_start in line: break else: raise ValueError("Cannot find magic_start line: `%s`\nPerhaps this is not an Abinit output file!" % magic_start) lines = lines[i+1:] for i, line in enumerate(lines): if magic_stop in line: break else: raise ValueError("Cannot find magic_stop line: `%s`\nPerhaps this is not an Abinit output file!" % magic_stop) lines = lines[:i] # Parse data. Assume format: # timopt -1 # tnons 0.0000000 0.0000000 0.0000000 0.2500000 0.2500000 0.2500000 # 0.0000000 0.0000000 0.0000000 0.2500000 0.2500000 0.2500000 def get_dtindex_key_value(line): tokens = line.split() s, value = tokens[0], " ".join(tokens[1:]) l = [] for i, c in enumerate(s[::-1]): if c.isalpha(): key = s[:len(s)-i] break l.append(c) else: raise ValueError("Cannot find dataset index in token: %s\n" % s) #print(line, "\n", l) dtindex = None if l: l.reverse() dtindex = int("".join(l)) return dtindex, key, value # (varname, dtindex), [line1, line2 ...] stack_var, stack_lines = None, [] def pop_stack(): if stack_lines: key, dtidx = stack_var value = " ".join(stack_lines) if dtidx is None: vars_global[key] = value else: try: vars_dataset[dtidx][key] = value except KeyError: if dtidx==0: vars_global[key] = value else: raise Exception("dataset index != 0 but still not found in vars_dataset") for line in lines: if not line: continue # Ignore first char line = line[1:].lstrip().rstrip() if not line: continue #print("line", line) if line[0].isalpha(): pop_stack() stack_lines = [] dtidx, key, value = get_dtindex_key_value(line) stack_var = (key, dtidx) stack_lines.append(value) else: stack_lines.append(line) pop_stack() return vars_global, vars_dataset def _get_structures(self, what: str) -> list[Structure]: if what == "header": vars_global, vars_dataset = self.initial_vars_global, self.initial_vars_dataset elif what == "footer": vars_global, vars_dataset = self.final_vars_global, self.final_vars_dataset else: raise ValueError("Invalid value for what: `%s`" % str(what)) #print("global", vars_global["acell"]) from abipy.abio.abivars import is_abiunit inigeo = {k: vars_global[k] for k in GEOVARS if k in vars_global} spgvars = ("spgroup", "symrel", "tnons", "symafm") spgd_global = {k: vars_global[k] for k in spgvars if k in vars_global} global_kptopt = vars_global.get("kptopt", 1) structures = [] for i in self.datasets: # This code breaks down if there are conflicting GEOVARS in globals and dataset. d = inigeo.copy() d.update({k: vars_dataset[i][k] for k in GEOVARS if k in vars_dataset[i]}) for key, value in d.items(): # Must handle possible unit. fact = 1.0 tokens = [t.lower() for t in value.split()] if is_abiunit(tokens[-1]): tokens, unit = tokens[:-1], tokens[-1] if unit in ("angstr", "angstrom", "angstroms"): fact = 1.0 / bohr_to_ang elif unit in ("bohr", "bohrs", "au"): fact = 1.0 else: raise ValueError("Don't know how to handle unit: %s" % unit) s = " ".join(tokens) dtype = float if key not in ("ntypat", "typat", "natom") else int try: #print(key, s) value = np.fromstring(s, sep=" ", dtype=dtype) #print(key, value) if fact != 1.0: value *= fact # Do not change integer arrays e.g typat! d[key] = value except ValueError as exc: print(key, s) raise exc if "rprim" not in d and "angdeg" not in d: d["rprim"] = np.eye(3) if "natom" in d and d["natom"] == 1 and all(k not in d for k in ("xred", "xcart", "xangst")): d["xred"] = np.zeros(3) #print(d) abistr = Structure.from_abivars(d) # Extract Abinit spacegroup. spgd = spgd_global.copy() spgd.update({k: vars_dataset[i][k] for k in spgvars if k in vars_dataset[i]}) spgid = int(spgd.get("spgroup", 0)) if "symrel" not in spgd: symrel = np.reshape(np.eye(3, 3, dtype=int), (1, 3, 3)) spgd["symrel"] = " ".join((str(i) for i in symrel.flatten())) else: symrel = np.reshape(np.array([int(n) for n in spgd["symrel"].split()], dtype=int), (-1, 3, 3)) nsym = len(symrel) assert nsym == spgd.get("nsym", nsym) #; print(symrel.shape) if "tnons" in spgd: tnons = np.reshape(np.array([float(t) for t in spgd["tnons"].split()], dtype=float), (nsym, 3)) else: tnons = np.zeros((nsym, 3)) if "symafm" in spgd: symafm = np.array([int(n) for n in spgd["symafm"].split()], dtype=int) symafm.shape = (nsym,) else: symafm = np.ones(nsym, dtype=int) try: has_timerev = has_timrev_from_kptopt(vars_dataset[i].get("kptopt", global_kptopt)) abi_spacegroup = AbinitSpaceGroup(spgid, symrel, tnons, symafm, has_timerev, inord="C") abistr.set_abi_spacegroup(abi_spacegroup) except Exception as exc: print("Cannot build AbinitSpaceGroup from the variables reported in file!\n", str(exc)) structures.append(abistr) return structures
[docs] @lazy_property def initial_structures(self) -> list[Structure]: """List of initial |Structure|.""" return self._get_structures("header")
@property def has_same_initial_structures(self) -> bool: """True if all initial structures are equal.""" return all(self.initial_structures[0] == s for s in self.initial_structures)
[docs] @lazy_property def final_structures(self) -> list[Structure]: """List of final |Structure|.""" if self.run_completed: return self._get_structures("footer") else: cprint("Cannot extract final structures from file.\n %s" % self.filepath, "red") return []
[docs] @lazy_property def initial_structure(self) -> Structure: """ The |Structure| defined in the output file. If the input file contains multiple datasets **AND** the datasets have different structures, this property returns None. In this case, one has to access the structure of the individual datasets. For example: self.initial_structures[0] gives the structure of the first dataset. """ if not self.has_same_initial_structures: print("Datasets have different structures. Returning None. Use initial_structures[0]") return None return self.initial_structures[0]
@property def has_same_final_structures(self) -> bool: """True if all initial structures are equal.""" return all(self.final_structures[0] == s for s in self.final_structures)
[docs] @lazy_property def final_structure(self) -> Union[Structure, None]: """ The |Structure| defined in the output file. If the input file contains multiple datasets **AND** the datasets have different structures, this property returns None. In this case, one has to access the structure of the individual datasets. For example: self.final_structures[0] gives the structure of the first dataset. """ if not self.has_same_final_structures: print("Datasets have different structures. Returning None. Use final_structures[0]") return None return self.final_structures[0]
[docs] def diff_datasets(self, dt_list1, dt_list2, with_params=True, differ="html", dryrun=False): """ Compare datasets """ if not isinstance(dt_list1, (list, tuple)): dt_list1 = [dt_list1] if not isinstance(dt_list2, (list, tuple)): dt_list2 = [dt_list2] dt_lists = [dt_list1, dt_list2] import tempfile tmp_names = [] for i in range(2): _, tmpname = tempfile.mkstemp(text=True) tmp_names.append(tmpname) with open(tmpname, "wt") as fh: if with_params: fh.write(self.header) for idt in dt_lists[i]: fh.write(self.datasets[idt]) if with_params: fh.write(self.footer) if differ == "html": from abipy.tools.devtools import HtmlDiff diff = HtmlDiff(tmp_names) if dryrun: return diff else: return diff.open_browser() else: cmd = "%s %s %s" % (differ, tmp_names[0], tmp_names[1]) if dryrun: return cmd else: return os.system(cmd)
def __str__(self) -> str: return self.to_string()
[docs] def to_string(self, verbose: int = 0) -> str: """String representation.""" lines = ["ndtset: %d, completed: %s" % (self.ndtset, self.run_completed)] app = lines.append # Different cases depending whether final structures are available # and whether structures are equivalent. if self.run_completed: if self.has_same_final_structures: if self.initial_structure != self.final_structure: # Structural relaxation. df = dataframes_from_structures([self.initial_structure, self.final_structure], index=["initial", "final"]) app("Lattice parameters:") app(str(df.lattice)) app("Atomic coordinates:") app(str(df.coords)) else: # initial == final. Print final structure. app(self.final_structure.to_string(verbose=verbose)) else: # Final structures are not available. if self.has_same_initial_structures: app(self.initial_structure.to_string(verbose=verbose)) else: df = dataframes_from_structures(self.initial_structures, index=[i+1 for i in range(self.ndtset)]) app("Lattice parameters:") app(str(df.lattice)) app("Atomic coordinates:") app(str(df.coords)) # Print dataframe with dimensions. df = self.get_dims_spginfo_dataframe(verbose=verbose) from abipy.tools.printing import print_dataframe strio = StringIO() print_dataframe(df, file=strio) strio.seek(0) app("") app(marquee("Dimensions of calculation", mark="=")) app("".join(strio)) return "\n".join(lines)
[docs] def get_dims_spginfo_dataframe(self, verbose: int = 0) -> pd.DataFrame: """ Parse the section with the dimensions of the calculation. Return Dataframe. """ dims_dataset, spginfo_dataset = self.get_dims_spginfo_dataset(verbose=verbose) rows = [] for dtind, dims in dims_dataset.items(): d = OrderedDict() d["dataset"] = dtind d.update(dims) d.update(spginfo_dataset[dtind]) rows.append(d) df = pd.DataFrame(rows, columns=list(rows[0].keys()) if rows else None) df = df.set_index('dataset') return df
[docs] def get_dims_spginfo_dataset(self, verbose=0) -> tuple[dict, dict]: """ Parse the section with the dimensions of the calculation. Return dictionaries Args: verbose: Verbosity level. Return: (dims_dataset, spginfo_dataset) where dims_dataset[i] is an OrderedDict with the dimensions of dataset `i` spginfo_dataset[i] is a dictionary with space group information. """ # If single dataset, we have to parse # # Symmetries : space group Fd -3 m (#227); Bravais cF (face-center cubic) # ================================================================================ # Values of the parameters that define the memory need of the present run # intxc = 0 ionmov = 0 iscf = 7 lmnmax = 6 # lnmax = 6 mgfft = 18 mpssoang = 3 mqgrid = 3001 # natom = 2 nloc_mem = 1 nspden = 1 nspinor = 1 # nsppol = 1 nsym = 48 n1xccc = 2501 ntypat = 1 # occopt = 1 xclevel = 2 # - mband = 8 mffmem = 1 mkmem = 29 # mpw = 202 nfft = 5832 nkpt = 29 # ================================================================================ # P This job should need less than 3.389 Mbytes of memory. # Rough estimation (10% accuracy) of disk space for files : # _ WF disk file : 0.717 Mbytes ; DEN or POT disk file : 0.046 Mbytes. # ================================================================================ # If multi datasets we have to parse: # DATASET 2 : space group F-4 3 m (#216); Bravais cF (face-center cubic) # ================================================================================ # Values of the parameters that define the memory need for DATASET 2. # intxc = 0 ionmov = 0 iscf = 7 lmnmax = 2 # lnmax = 2 mgfft = 12 mpssoang = 3 mqgrid = 3001 # natom = 2 nloc_mem = 1 nspden = 1 nspinor = 1 # nsppol = 1 nsym = 24 n1xccc = 2501 ntypat = 2 # occopt = 1 xclevel = 1 # - mband = 10 mffmem = 1 mkmem = 2 # mpw = 69 nfft = 1728 nkpt = 2 # ================================================================================ # P This job should need less than 1.331 Mbytes of memory. # Rough estimation (10% accuracy) of disk space for files : # _ WF disk file : 0.023 Mbytes ; DEN or POT disk file : 0.015 Mbytes. # ================================================================================ magic = "Values of the parameters that define the memory need" memory_pre = "P This job should need less than" magic_exit = "------------- Echo of variables that govern the present computation" filesizes_pre = "_ WF disk file :" #verbose = 1 def parse_spgline(line): """Parse the line with space group info, return dict.""" # Could use regular expressions ... i = line.find("space group") if i == -1: # the unit cell is not primitive return {} spg_str, brav_str = line[i:].replace("space group", "").split(";") toks = spg_str.split() return { "spg_symbol": "".join(toks[:-1]), "spg_number": int(toks[-1].replace("(", "").replace(")", "").replace("#", "")), "bravais": brav_str.strip(), } from abipy.tools.numtools import grouper dims_dataset, spginfo_dataset = OrderedDict(), OrderedDict() inblock = 0 with open(self.filepath, "rt") as fh: for line in fh: line = line.strip() if verbose > 1: print("inblock:", inblock, " at line:", line) if line.startswith(magic_exit): break if (not line or line.startswith("===") or line.startswith("---") #or line.startswith("P") or line.startswith("Rough estimation") or line.startswith("PAW method is used")): continue if line.startswith("DATASET") or line.startswith("Symmetries :"): # Get dataset index, parse space group and lattice info, init new dims dict. inblock = 1 if line.startswith("Symmetries :"): # No multidataset dtindex = 1 else: tokens = line.split() dtindex = int(tokens[1]) dims_dataset[dtindex] = dims = OrderedDict() spginfo_dataset[dtindex] = parse_spgline(line) continue if inblock == 1 and line.startswith(magic): inblock = 2 continue if inblock == 2: # Lines with data. if line.startswith("For the susceptibility"): continue if line.startswith(memory_pre): dims["mem_per_proc_mb"] = float(line.replace(memory_pre, "").split()[0]) elif line.startswith(filesizes_pre): tokens = line.split() mbpos = [i - 1 for i, t in enumerate(tokens) if t.startswith("Mbytes")] assert len(mbpos) == 2 dims["wfk_size_mb"] = float(tokens[mbpos[0]]) dims["denpot_size_mb"] = float(tokens[mbpos[1]]) elif line.startswith("Pmy_natom="): dims.update(my_natom=int(line.replace("Pmy_natom=", "").strip())) #print("my_natom", dims["my_natom"]) else: if line and line[0] == "-": line = line[1:] tokens = grouper(2, line.replace("=", "").split()) if verbose > 1: print("tokens:", tokens) dims.update([(t[0], int(t[1])) for t in tokens]) return dims_dataset, spginfo_dataset
[docs] def next_gs_scf_cycle(self) -> GroundStateScfCycle: """ Return the next :class:`GroundStateScfCycle` in the file. None if not found. """ return GroundStateScfCycle.from_stream(self)
[docs] def get_all_gs_scf_cycles(self) -> list[GroundStateScfCycle]: """Return list of :class:`GroundStateScfCycle` objects. Empty list if no entry is found.""" # NOTE: get_all should not used with next because of the call to self.seek(0) # The API should be refactored cycles = [] self.seek(0) while True: cycle = self.next_gs_scf_cycle() if cycle is None: break cycles.append(cycle) self.seek(0) return cycles
[docs] def next_d2de_scf_cycle(self) -> D2DEScfCycle: """ Return :class:`D2DEScfCycle` with information on the DFPT iterations. None if not found. """ return D2DEScfCycle.from_stream(self)
[docs] def get_all_d2de_scf_cycles(self) -> list[D2DEScfCycle]: """Return list of :class:`D2DEScfCycle` objects. Empty list if no entry is found.""" cycles = [] self.seek(0) while True: cycle = self.next_d2de_scf_cycle() if cycle is None: break cycles.append(cycle) return cycles
[docs] def plot(self, tight_layout=True, with_timer=False, show=True): """ Plot GS/DFPT SCF cycles and timer data found in the output file. Args: with_timer: True if timer section should be plotted """ from abipy.tools.plotting import MplExposer #, PanelExposer with MplExposer(slide_mode=False, slide_timeout=5.0) as e: e(self.yield_figs(tight_layout=tight_layout, with_timer=with_timer))
# TODO: Use header and vars to understand if we have SCF/DFPT/Relaxation
[docs] def yield_figs(self, **kwargs): # pragma: no cover """ This function *generates* a predefined list of matplotlib figures with minimal input from the user. """ tight_layout = kwargs.pop("tight_layout", False) with_timer = kwargs.pop("with_timer", True) for icycle, cycle in enumerate(self.get_all_gs_scf_cycles()): yield cycle.plot(title=f"SCF cycle {icycle}", tight_layout=tight_layout, show=False) for icycle, cycle in enumerate(self.get_all_d2de_scf_cycles()): yield cycle.plot(title=f"DFPT cycle {icycle}", tight_layout=tight_layout, show=False) if with_timer: self.seek(0) try: yield self.get_timer().plot_all(tight_layout=tight_layout, show=False) except Exception: print("Abinit output files does not contain timopt data")
[docs] def yield_plotly_figs(self, **kwargs): # pragma: no cover """ This function *generates* a predefined list of plotly figures with minimal input from the user. """ with_timer = kwargs.pop("with_timer", True) for icycle, cycle in enumerate(self.get_all_gs_scf_cycles()): yield cycle.plotly(title=f"SCF cycle {icycle}", show=False) for icycle, cycle in enumerate(self.get_all_d2de_scf_cycles()): yield cycle.plotly(title=f"DFPT cycle {icycle}", show=False)
#if with_timer: # self.seek(0) # try: # yield self.get_timer().plot_all(tight_layout=tight_layout, show=False) # except Exception: # print("Abinit output files does not contain timopt data")
[docs] def compare_gs_scf_cycles(self, others, show=True) -> list[Figure]: """ Produce and returns a list of matplotlib_ figure comparing the GS self-consistent cycle in self with the ones in others. Args: others: list of :class:`AbinitOutputFile` objects or strings with paths to output files. show: True to diplay plots. """ # Open file here if we receive a string. Files will be closed before returning close_files = [] for i, other in enumerate(others): if is_string(other): others[i] = self.__class__.from_file(other) close_files.append(i) fig, figures = None, [] while True: cycle = self.next_gs_scf_cycle() if cycle is None: break fig = cycle.plot(show=False) for i, other in enumerate(others): other_cycle = other.next_gs_scf_cycle() if other_cycle is None: break last = (i == len(others) - 1) fig = other_cycle.plot(ax_list=fig.axes, show=show and last) if last: fig.tight_layout() figures.append(fig) self.seek(0) for other in others: other.seek(0) if close_files: for i in close_files: others[i].close() return figures
[docs] def compare_d2de_scf_cycles(self, others, show=True) -> list[Figure]: """ Produce and returns a list of matplotlib_ figure comparing the DFPT self-consistent cycle in self with the ones in others. Args: others: list of :class:`AbinitOutputFile` objects or strings with paths to output files. show: True to diplay plots. """ # Open file here if we receive a string. Files will be closed before returning close_files = [] for i, other in enumerate(others): if is_string(other): others[i] = self.__class__.from_file(other) close_files.append(i) fig, figures = None, [] while True: cycle = self.next_d2de_scf_cycle() if cycle is None: break fig = cycle.plot(show=False) for i, other in enumerate(others): other_cycle = other.next_d2de_scf_cycle() if other_cycle is None: break last = (i == len(others) - 1) fig = other_cycle.plot(ax_list=fig.axes, show=show and last) if last: fig.tight_layout() figures.append(fig) self.seek(0) for other in others: other.seek(0) if close_files: for i in close_files: others[i].close() return figures
[docs] def get_panel(self, **kwargs): """ Build panel with widgets to interact with the Abinit output file either in a notebook or in panel app. """ from abipy.panels.outputs import AbinitOutputFilePanel return AbinitOutputFilePanel(self).get_panel(**kwargs)
[docs] def write_notebook(self, nbpath=None) -> str: """ Write a jupyter_ notebook to nbpath. If ``nbpath`` is None, a temporay file in the current working directory is created. Return path to the notebook. """ nbformat, nbv, nb = self.get_nbformat_nbv_nb(title=None) nb.cells.extend([ nbv.new_code_cell("abo = abilab.abiopen('%s')" % self.filepath), nbv.new_code_cell("print(abo.events)"), nbv.new_code_cell("abo.plot()"), ]) return self._write_nb_nbpath(nb, nbpath)
[docs] def validate_output_parser(abitests_dir=None, output_files=None) -> int: # pragma: no cover """ Validate/test Abinit output parser. Args: dirpath: Abinit tests directory. output_files: List of Abinit output files. Return: Exit code. """ def is_abinit_output(path): """ True if path is one of the output files used in the Abinit Test suite. """ if not path.endswith(".abo"): return False if not path.endswith(".out"): return False with open(path, "rt") as fh: for i, line in enumerate(fh): if i == 1: return line.rstrip().lower().endswith("abinit") return False # Files are collected in paths. paths = [] if abitests_dir is not None: print("Analyzing directory %s for input files" % abitests_dir) for dirpath, dirnames, filenames in os.walk(abitests_dir): for fname in filenames: path = os.path.join(dirpath, fname) if is_abinit_output(path): paths.append(path) if output_files is not None: print("Analyzing files:", str(output_files)) for arg in output_files: if is_abinit_output(arg): paths.append(arg) nfiles = len(paths) if nfiles == 0: cprint("Empty list of input files.", "red") return 0 print("Found %d Abinit output files" % len(paths)) errpaths = [] for path in paths: print(path + ": ", end="") try: out = AbinitOutputFile.from_file(path) s = out.to_string(verbose=2) assert out.run_completed cprint("OK", "green") except Exception as exc: if not isinstance(exc, NotImplementedError): cprint("FAILED", "red") errpaths.append(path) import traceback print(traceback.format_exc()) #print("[%s]: Exception:\n%s" % (path, str(exc))) #with open(path, "rt") as fh: # print(10*"=" + "Input File" + 10*"=") # print(fh.read()) # print() else: cprint("NOTIMPLEMENTED", "magenta") if errpaths: cprint("failed: %d/%d [%.1f%%]" % (len(errpaths), nfiles, 100 * len(errpaths)/nfiles), "red") for i, epath in enumerate(errpaths): cprint("[%d] %s" % (i, epath), "red") else: cprint("All input files successfully parsed!", "green") return len(errpaths)
[docs] class AboRobot(Robot): """ This robot analyzes the results contained in multiple Abinit output files. Can compare dimensions, SCF cycles, analyze timers. .. rubric:: Inheritance Diagram .. inheritance-diagram:: AboRobot """ EXT = "abo"
[docs] def get_dims_dataframe(self, with_time=True, index=None) -> pd.DataFrame: """ Build and return a |pandas-DataFrame| with the dimensions of the calculation. Args: with_time: True if walltime and cputime should be added index: Index of the dataframe. Use relative paths of files if None. """ rows, my_index = [], [] for i, abo in enumerate(self.abifiles): try: dims_dataset, spg_dataset = abo.get_dims_spginfo_dataset() except Exception as exc: cprint("Exception while trying to get dimensions from %s\n%s" % (abo.relpath, str(exc)), "yellow") continue for dtindex, dims in dims_dataset.items(): dims = dims.copy() dims.update({"dtset": dtindex}) # Add walltime and cputime in seconds if with_time: dims.update(OrderedDict([(k, getattr(abo, k)) for k in ("overall_cputime", "proc0_cputime", "overall_walltime", "proc0_walltime")])) rows.append(dims) my_index.append(abo.relpath if index is None else index[i]) return pd.DataFrame(rows, index=my_index, columns=list(rows[0].keys()))
[docs] def get_dataframe(self, with_geo=True, with_dims=True, abspath=False, funcs=None) -> pd.DataFrame: """ Return a |pandas-DataFrame| with the most important results and the filenames as index. Args: with_geo: True if structure info should be added to the dataframe with_dims: True if dimensions should be added abspath: True if paths in index should be absolute. Default: Relative to getcwd(). funcs: Function or list of functions to execute to add more data to the DataFrame. Each function receives a |GsrFile| object and returns a tuple (key, value) where key is a string with the name of column and value is the value to be inserted. """ rows, row_names = [], [] for label, abo in self.items(): row_names.append(label) d = OrderedDict() if with_dims: dims_dataset, spg_dataset = abo.get_dims_spginfo_dataset() if len(dims_dataset) > 1: cprint("Multiple datasets are not supported. ARGH!", "yellow") d.update(dims_dataset[1]) # Add info on structure. if with_geo and abo.run_completed: d.update(abo.final_structure.get_dict4pandas(with_spglib=True)) # Execute functions if funcs is not None: d.update(self._exec_funcs(funcs, abo)) rows.append(d) row_names = row_names if not abspath else self._to_relpaths(row_names) return pd.DataFrame(rows, index=row_names, columns=list(rows[0].keys()))
[docs] def get_time_dataframe(self) -> pd.DataFrame: """ Return a |pandas-DataFrame| with the wall-time, cpu time in seconds and the filenames as index. """ rows, row_names = [], [] for label, abo in self.items(): row_names.append(label) d = OrderedDict([(k, getattr(abo, k)) for k in ("overall_cputime", "proc0_cputime", "overall_walltime", "proc0_walltime")]) rows.append(d) return pd.DataFrame(rows, index=row_names, columns=list(rows[0].keys()))
# TODO #def gridplot_timer(self)
[docs] def yield_figs(self, **kwargs): # pragma: no cover """ This function *generates* a predefined list of matplotlib figures with minimal input from the user. """ yield None
[docs] def write_notebook(self, nbpath=None) -> str: """ Write a jupyter_ notebook to nbpath. If nbpath is None, a temporay file in the current working directory is created. Return path to the notebook. """ nbformat, nbv, nb = self.get_nbformat_nbv_nb(title=None) args = [(l, f.filepath) for l, f in self.items()] nb.cells.extend([ #nbv.new_markdown_cell("# This is a markdown cell"), nbv.new_code_cell("robot = abilab.AboRobot(*%s)\nrobot.trim_paths()\nrobot" % str(args)), nbv.new_code_cell("# robot.get_dims_dataframe()"), nbv.new_code_cell("robot.get_dataframe()"), ]) # Mixins nb.cells.extend(self.get_baserobot_code_cells()) return self._write_nb_nbpath(nb, nbpath)
[docs] class OutNcFile(AbinitNcFile): """ Class representing the _OUT.nc file containing the dataset results produced at the end of the run. The netcdf variables can be accessed via instance attribute e.g. ``outfile.ecut``. Provides integration with ipython_. """ # TODO: This object is deprecated def __init__(self, filepath: str): super().__init__(filepath) self.reader = NetcdfReader(filepath) self._varscache = {k: None for k in self.reader.rootgrp.variables} def __dir__(self): """Ipython integration.""" return sorted(list(self._varscache.keys())) def __getattribute__(self, name): try: return super().__getattribute__(name) except AttributeError: # Look in self._varscache varscache = super().__getattribute__("_varscache") if name not in varscache: raise AttributeError("Cannot find attribute %s" % name) reader = super().__getattribute__("reader") if varscache[name] is None: varscache[name] = reader.read_value(name) return varscache[name]
[docs] @lazy_property def params(self) -> dict: """dict with parameters that might be subject to convergence studies.""" return {}
[docs] def close(self) -> None: """Close the file.""" self.reader.close()
[docs] def get_allvars(self): """ Read all netcdf_ variables present in the file. Return dictionary varname --> value """ for k, v in self._varscache.items(): if v is not None: continue self._varscache[k] = self.reader.read_value(k) return self._varscache